|
@@ -39,6 +39,7 @@ import org.apache.log4j.WriterAppender;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
+import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
@@ -249,14 +250,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
numServers = 3;
|
|
|
servers = LaunchServers(numServers);
|
|
|
String path = "/hzxidtest";
|
|
|
- int leader = -1;
|
|
|
-
|
|
|
- // find the leader
|
|
|
- for (int i = 0; i < numServers; i++) {
|
|
|
- if (servers.mt[i].main.quorumPeer.leader != null) {
|
|
|
- leader = i;
|
|
|
- }
|
|
|
- }
|
|
|
+ int leader = servers.findLeader();
|
|
|
|
|
|
// make sure there is a leader
|
|
|
Assert.assertTrue("There should be a leader", leader >= 0);
|
|
@@ -366,12 +360,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
servers = LaunchServers(numServers, 500);
|
|
|
|
|
|
// find the leader
|
|
|
- int trueLeader = -1;
|
|
|
- for (int i = 0; i < numServers; i++) {
|
|
|
- if (servers.mt[i].main.quorumPeer.leader != null) {
|
|
|
- trueLeader = i;
|
|
|
- }
|
|
|
- }
|
|
|
+ int trueLeader = servers.findLeader();
|
|
|
Assert.assertTrue("There should be a leader", trueLeader >= 0);
|
|
|
|
|
|
// find a follower
|
|
@@ -435,12 +424,16 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
|
|
|
while (zk.getState() != state) {
|
|
|
if (iterations-- == 0) {
|
|
|
- throw new RuntimeException("Waiting too long");
|
|
|
+ throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state);
|
|
|
}
|
|
|
Thread.sleep(500);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void waitForAll(Servers servers, States state) throws InterruptedException {
|
|
|
+ waitForAll(servers.zk, state);
|
|
|
+ }
|
|
|
+
|
|
|
private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
|
|
|
int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
|
|
|
boolean someoneNotConnected = true;
|
|
@@ -465,6 +458,37 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
private static class Servers {
|
|
|
MainThread mt[];
|
|
|
ZooKeeper zk[];
|
|
|
+ int[] clientPorts;
|
|
|
+
|
|
|
+ public void shutDownAllServers() throws InterruptedException {
|
|
|
+ for (MainThread t: mt) {
|
|
|
+ t.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void restartAllServersAndClients(Watcher watcher) throws IOException {
|
|
|
+ for (MainThread t : mt) {
|
|
|
+ if (!t.isAlive()) {
|
|
|
+ t.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (int i = 0; i < zk.length; i++) {
|
|
|
+ restartClient(i, watcher);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void restartClient(int clientIndex, Watcher watcher) throws IOException {
|
|
|
+ zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
|
|
|
+ }
|
|
|
+
|
|
|
+ public int findLeader() {
|
|
|
+ for (int i = 0; i < mt.length; i++) {
|
|
|
+ if (mt[i].main.quorumPeer.leader != null) {
|
|
|
+ return i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
@@ -474,7 +498,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
|
|
|
/** * This is a helper function for launching a set of servers
|
|
|
*
|
|
|
- * @param numServers* @param tickTime A ticktime to pass to MainThread
|
|
|
+ * @param numServers the number of servers
|
|
|
+ * @param tickTime A ticktime to pass to MainThread
|
|
|
* @return
|
|
|
* @throws IOException
|
|
|
* @throws InterruptedException
|
|
@@ -482,30 +507,28 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
|
|
|
int SERVER_COUNT = numServers;
|
|
|
Servers svrs = new Servers();
|
|
|
- final int clientPorts[] = new int[SERVER_COUNT];
|
|
|
+ svrs.clientPorts = new int[SERVER_COUNT];
|
|
|
StringBuilder sb = new StringBuilder();
|
|
|
for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- clientPorts[i] = PortAssignment.unique();
|
|
|
- sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
|
|
|
+ svrs.clientPorts[i] = PortAssignment.unique();
|
|
|
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
|
|
|
}
|
|
|
String quorumCfgSection = sb.toString();
|
|
|
|
|
|
- MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
- ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
+ svrs.mt = new MainThread[SERVER_COUNT];
|
|
|
+ svrs.zk = new ZooKeeper[SERVER_COUNT];
|
|
|
for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
if (tickTime != null) {
|
|
|
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
|
|
|
+ svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
|
|
|
} else {
|
|
|
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
+ svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection);
|
|
|
}
|
|
|
- mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ svrs.mt[i].start();
|
|
|
+ svrs.restartClient(i, this);
|
|
|
}
|
|
|
-
|
|
|
- waitForAll(zk, States.CONNECTED);
|
|
|
-
|
|
|
- svrs.mt = mt;
|
|
|
- svrs.zk = zk;
|
|
|
+
|
|
|
+ waitForAll(svrs, States.CONNECTED);
|
|
|
+
|
|
|
return svrs;
|
|
|
}
|
|
|
|
|
@@ -673,7 +696,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
+ ":" + electionPort1 + ";" + CLIENT_PORT_QP1
|
|
|
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
+ ":" + electionPort2 + ";" + CLIENT_PORT_QP2;
|
|
|
-
|
|
|
+
|
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
|
q1.start();
|
|
@@ -888,4 +911,105 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
maxSessionTimeOut, quorumPeer.getMaxSessionTimeout());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
|
|
|
+ final int LEADER_TIMEOUT_MS = 10_000;
|
|
|
+ // 1. start up server and wait for leader election to finish
|
|
|
+ ClientBase.setupTestEnv();
|
|
|
+ final int SERVER_COUNT = 3;
|
|
|
+ servers = LaunchServers(SERVER_COUNT);
|
|
|
+
|
|
|
+ waitForAll(servers, States.CONNECTED);
|
|
|
+
|
|
|
+ // we need to shutdown and start back up to make sure that the create session isn't the first transaction since
|
|
|
+ // that is rather innocuous.
|
|
|
+ servers.shutDownAllServers();
|
|
|
+ waitForAll(servers, States.CONNECTING);
|
|
|
+ servers.restartAllServersAndClients(this);
|
|
|
+ waitForAll(servers, States.CONNECTED);
|
|
|
+
|
|
|
+ // 2. kill all followers
|
|
|
+ int leader = servers.findLeader();
|
|
|
+ Map<Long, Proposal> outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals;
|
|
|
+ // increase the tick time to delay the leader going to looking
|
|
|
+ servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
|
|
|
+ LOG.warn("LEADER {}", leader);
|
|
|
+
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ if (i != leader) {
|
|
|
+ servers.mt[i].shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3. start up the followers to form a new quorum
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ if (i != leader) {
|
|
|
+ servers.mt[i].start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 4. wait one of the follower to be the new leader
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ if (i != leader) {
|
|
|
+ // Recreate a client session since the previous session was not persisted.
|
|
|
+ servers.restartClient(i, this);
|
|
|
+ waitForOne(servers.zk[i], States.CONNECTED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 5. send a create request to old leader and make sure it's synced to disk,
|
|
|
+ // which means it acked from itself
|
|
|
+ try {
|
|
|
+ servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE,
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
+ Assert.fail("create /zk" + leader + " should have failed");
|
|
|
+ } catch (KeeperException e) {
|
|
|
+ }
|
|
|
+
|
|
|
+ // just make sure that we actually did get it in process at the
|
|
|
+ // leader
|
|
|
+ Assert.assertEquals(1, outstanding.size());
|
|
|
+ Proposal p = outstanding.values().iterator().next();
|
|
|
+ Assert.assertEquals(OpCode.create, p.request.getHdr().getType());
|
|
|
+
|
|
|
+ // make sure it has a chance to write it to disk
|
|
|
+ int sleepTime = 0;
|
|
|
+ Long longLeader = new Long(leader);
|
|
|
+ while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
|
|
|
+ if (sleepTime > 2000) {
|
|
|
+ Assert.fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset()
|
|
|
+ + " expected " + leader);
|
|
|
+ }
|
|
|
+ Thread.sleep(100);
|
|
|
+ sleepTime += 100;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
|
|
|
+ LOG.info("Waiting for leader {} to timeout followers", leader);
|
|
|
+ sleepTime = 0;
|
|
|
+ Follower f = servers.mt[leader].main.quorumPeer.follower;
|
|
|
+ while (f == null || !f.isRunning()) {
|
|
|
+ if (sleepTime > LEADER_TIMEOUT_MS * 2) {
|
|
|
+ Assert.fail("Took too long for old leader to time out " + servers.mt[leader].main.quorumPeer.getPeerState());
|
|
|
+ }
|
|
|
+ Thread.sleep(100);
|
|
|
+ sleepTime += 100;
|
|
|
+ f = servers.mt[leader].main.quorumPeer.follower;
|
|
|
+ }
|
|
|
+
|
|
|
+ int newLeader = servers.findLeader();
|
|
|
+ // make sure a different leader was elected
|
|
|
+ Assert.assertNotEquals(leader, newLeader);
|
|
|
+
|
|
|
+ // 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state
|
|
|
+ servers.mt[leader].shutdown();
|
|
|
+ servers.mt[leader].start();
|
|
|
+ waitForAll(servers, States.CONNECTED);
|
|
|
+
|
|
|
+ // 8. check the node exist in previous leader but not others
|
|
|
+ // make sure everything is consistent
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk" + leader, false));
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|