|
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quorum;
|
|
|
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashSet;
|
|
|
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
@@ -31,13 +32,13 @@ import org.junit.Test;
|
|
|
|
|
|
public class ReconfigRecoveryTest extends QuorumPeerTestBase {
|
|
|
/**
|
|
|
- * Reconfiguration recovery - test that a reconfiguration is completed
|
|
|
- * if leader has .next file during startup and new config is not running yet
|
|
|
+ * Reconfiguration recovery - test that a reconfiguration is completed if
|
|
|
+ * leader has .next file during startup and new config is not running yet
|
|
|
*/
|
|
|
@Test
|
|
|
public void testNextConfigCompletion() throws Exception {
|
|
|
ClientBase.setupTestEnv();
|
|
|
-
|
|
|
+
|
|
|
// 2 servers in current config, 3 in next config
|
|
|
final int SERVER_COUNT = 3;
|
|
|
final int clientPorts[] = new int[SERVER_COUNT];
|
|
@@ -46,96 +47,84 @@ public class ReconfigRecoveryTest extends QuorumPeerTestBase {
|
|
|
ArrayList<String> allServers = new ArrayList<String>();
|
|
|
|
|
|
String currentQuorumCfgSection = null, nextQuorumCfgSection;
|
|
|
-
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- clientPorts[i] = PortAssignment.unique();
|
|
|
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
|
|
|
- ":participant;localhost:" + clientPorts[i];
|
|
|
- allServers.add(server);
|
|
|
- sb.append(server +"\n");
|
|
|
- if (i == 1) currentQuorumCfgSection = sb.toString();
|
|
|
+
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ clientPorts[i] = PortAssignment.unique();
|
|
|
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
|
|
|
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
|
|
|
+ + clientPorts[i];
|
|
|
+ allServers.add(server);
|
|
|
+ sb.append(server + "\n");
|
|
|
+ if (i == 1)
|
|
|
+ currentQuorumCfgSection = sb.toString() + "version=100000000\n";
|
|
|
}
|
|
|
sb.append("version=200000000\n"); // version of current config is 100000000
|
|
|
nextQuorumCfgSection = sb.toString();
|
|
|
-
|
|
|
+
|
|
|
// Both servers 0 and 1 will have the .next config file, which means
|
|
|
// for them that a reconfiguration was in progress when they failed
|
|
|
// and the leader will complete it
|
|
|
MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
for (int i = 0; i < SERVER_COUNT - 1; i++) {
|
|
|
- mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
|
|
|
+ // note that we should run the server, shut it down and only then
|
|
|
+ // simulate a reconfig in progress by writing the temp file, but here no
|
|
|
+ // other server is competing with them in FLE, so we can skip this step
|
|
|
+ // (server 2 is booted after FLE ends)
|
|
|
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
|
|
|
mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
}
|
|
|
-
|
|
|
- Assert.assertTrue("waiting for server 0 being up",
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0],
|
|
|
+
|
|
|
+ Assert.assertTrue("waiting for server 0 being up", ClientBase
|
|
|
+ .waitForServerUp("127.0.0.1:" + clientPorts[0],
|
|
|
CONNECTION_TIMEOUT));
|
|
|
- Assert.assertTrue("waiting for server 1 being up",
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1],
|
|
|
+ Assert.assertTrue("waiting for server 1 being up", ClientBase
|
|
|
+ .waitForServerUp("127.0.0.1:" + clientPorts[1],
|
|
|
CONNECTION_TIMEOUT));
|
|
|
-
|
|
|
- int leader = mt[0].main.quorumPeer.leader == null ? 1: 0;
|
|
|
-
|
|
|
+
|
|
|
+ int leader = mt[0].main.quorumPeer.leader == null ? 1 : 0;
|
|
|
+
|
|
|
// the new server's config is going to include itself and the current leader
|
|
|
sb = new StringBuilder();
|
|
|
sb.append(allServers.get(leader) + "\n");
|
|
|
sb.append(allServers.get(2) + "\n");
|
|
|
-
|
|
|
+
|
|
|
// suppose that this new server never heard about the reconfig proposal
|
|
|
String newServerInitialConfig = sb.toString();
|
|
|
mt[2] = new MainThread(2, clientPorts[2], newServerInitialConfig);
|
|
|
mt[2].start();
|
|
|
- zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
|
|
- Assert.assertTrue("waiting for server 2 being up",
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
|
|
|
+ Assert.assertTrue("waiting for server 2 being up", ClientBase
|
|
|
+ .waitForServerUp("127.0.0.1:" + clientPorts[2],
|
|
|
CONNECTION_TIMEOUT));
|
|
|
-
|
|
|
- ReconfigTest.testServerHasConfig(zk[0], allServers, null);
|
|
|
- ReconfigTest.testServerHasConfig(zk[1], allServers, null);
|
|
|
- ReconfigTest.testServerHasConfig(zk[2], allServers, null);
|
|
|
-
|
|
|
- ReconfigTest.testNormalOperation(zk[0], zk[2]);
|
|
|
- ReconfigTest.testNormalOperation(zk[2], zk[1]);
|
|
|
-
|
|
|
- zk[2].close();
|
|
|
- mt[2].shutdown();
|
|
|
-
|
|
|
- //now suppose that the new server heard the reconfig request
|
|
|
- mt[2] = new MainThreadReconfigRecovery(2, clientPorts[2], newServerInitialConfig, nextQuorumCfgSection);
|
|
|
- mt[2].start();
|
|
|
- zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
|
|
- Assert.assertTrue("waiting for server 2 being up",
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
|
|
|
- CONNECTION_TIMEOUT));
|
|
|
-
|
|
|
ReconfigTest.testServerHasConfig(zk[0], allServers, null);
|
|
|
ReconfigTest.testServerHasConfig(zk[1], allServers, null);
|
|
|
ReconfigTest.testServerHasConfig(zk[2], allServers, null);
|
|
|
-
|
|
|
+
|
|
|
ReconfigTest.testNormalOperation(zk[0], zk[2]);
|
|
|
ReconfigTest.testNormalOperation(zk[2], zk[1]);
|
|
|
|
|
|
- for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- zk[i].close();
|
|
|
- }
|
|
|
for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
mt[i].shutdown();
|
|
|
+ zk[i].close();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Reconfiguration recovery - current config servers discover .next file,
|
|
|
- * but they're both observers and their ports change in next config. Suppose that next config wasn't activated yet.
|
|
|
- * Should complete reconfiguration.
|
|
|
+ * but they're both observers and their ports change in next config. Suppose
|
|
|
+ * that next config wasn't activated yet. Should complete reconfiguration.
|
|
|
*/
|
|
|
@Test
|
|
|
public void testCurrentServersAreObserversInNextConfig() throws Exception {
|
|
|
ClientBase.setupTestEnv();
|
|
|
-
|
|
|
+
|
|
|
// 2 servers in current config, 5 in next config
|
|
|
final int SERVER_COUNT = 5;
|
|
|
final int clientPorts[] = new int[SERVER_COUNT];
|
|
@@ -143,82 +132,118 @@ public class ReconfigRecoveryTest extends QuorumPeerTestBase {
|
|
|
StringBuilder sb = new StringBuilder();
|
|
|
String server;
|
|
|
|
|
|
- String currentQuorumCfgSection = null, nextQuorumCfgSection;
|
|
|
-
|
|
|
+ String currentQuorumCfg = null, currentQuorumCfgSection = null, nextQuorumCfgSection = null;
|
|
|
+
|
|
|
ArrayList<String> allServersCurrent = new ArrayList<String>();
|
|
|
ArrayList<String> allServersNext = new ArrayList<String>();
|
|
|
-
|
|
|
-
|
|
|
- for(int i = 0; i < 2; i++) {
|
|
|
- oldClientPorts[i] = PortAssignment.unique();
|
|
|
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
|
|
|
- ":participant;localhost:" + oldClientPorts[i];
|
|
|
- allServersCurrent.add(server);
|
|
|
- sb.append(server +"\n");
|
|
|
- }
|
|
|
-
|
|
|
+
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ oldClientPorts[i] = PortAssignment.unique();
|
|
|
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
|
|
|
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
|
|
|
+ + oldClientPorts[i];
|
|
|
+ allServersCurrent.add(server);
|
|
|
+ sb.append(server + "\n");
|
|
|
+ }
|
|
|
+
|
|
|
+ currentQuorumCfg = sb.toString();
|
|
|
+ sb.append("version=100000000\n");
|
|
|
currentQuorumCfgSection = sb.toString();
|
|
|
+
|
|
|
sb = new StringBuilder();
|
|
|
String role;
|
|
|
- for (int i=0; i<SERVER_COUNT; i++) {
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
clientPorts[i] = PortAssignment.unique();
|
|
|
- if (i < 2) role = "observer";
|
|
|
- else role = "participant";
|
|
|
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
|
|
|
- ":" + role + ";localhost:" + clientPorts[i];
|
|
|
+ if (i < 2) {
|
|
|
+ role = "observer";
|
|
|
+ } else {
|
|
|
+ role = "participant";
|
|
|
+ }
|
|
|
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
|
|
|
+ + ":" + PortAssignment.unique() + ":" + role
|
|
|
+ + ";localhost:" + clientPorts[i];
|
|
|
allServersNext.add(server);
|
|
|
- sb.append(server +"\n");
|
|
|
-
|
|
|
+ sb.append(server + "\n");
|
|
|
}
|
|
|
sb.append("version=200000000\n"); // version of current config is 100000000
|
|
|
nextQuorumCfgSection = sb.toString();
|
|
|
-
|
|
|
+
|
|
|
MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
|
|
|
+ // run servers 0 and 1 normally
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
- mt[i] = new MainThreadReconfigRecovery(i, oldClientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
|
|
|
+ mt[i] = new MainThread(i, oldClientPorts[i],
|
|
|
+ currentQuorumCfgSection);
|
|
|
mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + oldClientPorts[i],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
+ ClientBase.waitForServerUp(
|
|
|
+ "127.0.0.1:" + oldClientPorts[i],
|
|
|
+ CONNECTION_TIMEOUT * 2));
|
|
|
+ }
|
|
|
+
|
|
|
+ ReconfigTest.testNormalOperation(zk[0], zk[1]);
|
|
|
+
|
|
|
+ // shut them down and then simulate a reboot with a reconfig in progress
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ mt[i].shutdown();
|
|
|
+ zk[i].close();
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ Assert.assertTrue(
|
|
|
+ "waiting for server " + i + " being up",
|
|
|
+ ClientBase.waitForServerDown("127.0.0.1:"
|
|
|
+ + oldClientPorts[i], CONNECTION_TIMEOUT * 2));
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
|
|
|
+ mt[i].start();
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
}
|
|
|
|
|
|
// new members are initialized with current config + the new server
|
|
|
for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
- mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection + allServersNext.get(i));
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfg
|
|
|
+ + allServersNext.get(i));
|
|
|
mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
}
|
|
|
|
|
|
- for (int i=0; i<SERVER_COUNT; i++) {
|
|
|
- Assert.assertTrue("waiting for server "+ i + " being up",
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
|
|
|
- CONNECTION_TIMEOUT * 2));
|
|
|
- ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
|
|
|
+ CONNECTION_TIMEOUT * 2));
|
|
|
+ ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
ReconfigTest.testNormalOperation(zk[0], zk[2]);
|
|
|
ReconfigTest.testNormalOperation(zk[4], zk[1]);
|
|
|
|
|
|
for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
zk[i].close();
|
|
|
- }
|
|
|
- for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
mt[i].shutdown();
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
/**
|
|
|
- * Reconfiguration recovery - test that if servers in old config have a .next file
|
|
|
- * but no quorum of new config is up then no progress should be possible (no progress will happen
|
|
|
- * to ensure safety as the new config might be actually up but partitioned from old config)
|
|
|
+ * Reconfiguration recovery - test that if servers in old config have a
|
|
|
+ * .next file but no quorum of new config is up then no progress should be
|
|
|
+ * possible (no progress will happen to ensure safety as the new config
|
|
|
+ * might be actually up but partitioned from old config)
|
|
|
*/
|
|
|
@Test
|
|
|
public void testNextConfigUnreachable() throws Exception {
|
|
|
ClientBase.setupTestEnv();
|
|
|
-
|
|
|
+
|
|
|
// 2 servers in current config, 5 in next config
|
|
|
final int SERVER_COUNT = 5;
|
|
|
final int clientPorts[] = new int[SERVER_COUNT];
|
|
@@ -226,57 +251,61 @@ public class ReconfigRecoveryTest extends QuorumPeerTestBase {
|
|
|
String server;
|
|
|
|
|
|
String currentQuorumCfgSection = null, nextQuorumCfgSection;
|
|
|
-
|
|
|
+
|
|
|
ArrayList<String> allServers = new ArrayList<String>();
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- clientPorts[i] = PortAssignment.unique();
|
|
|
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
|
|
|
- ":participant;localhost:" + clientPorts[i];
|
|
|
- allServers.add(server);
|
|
|
- sb.append(server +"\n");
|
|
|
- if (i == 1) currentQuorumCfgSection = sb.toString();
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ clientPorts[i] = PortAssignment.unique();
|
|
|
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
|
|
|
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
|
|
|
+ + clientPorts[i];
|
|
|
+ allServers.add(server);
|
|
|
+ sb.append(server + "\n");
|
|
|
+ if (i == 1)
|
|
|
+ currentQuorumCfgSection = sb.toString() + "version=100000000\n";
|
|
|
}
|
|
|
- sb.append("version=200000000\n"); // version of current config is 100000000
|
|
|
+ sb.append("version=200000000\n"); // version of current config is 100000000
|
|
|
nextQuorumCfgSection = sb.toString();
|
|
|
-
|
|
|
- // lets start servers 2, 3, 4 with the new config
|
|
|
+
|
|
|
MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
|
|
|
// Both servers 0 and 1 will have the .next config file, which means
|
|
|
// for them that a reconfiguration was in progress when they failed
|
|
|
- // and the leader will complete it.
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
- mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
|
|
|
+ // note that we should run the server, shut it down and only then
|
|
|
+ // simulate a reconfig in progress by writing the temp file, but here no
|
|
|
+ // other server is competing with them in FLE, so we can skip this step
|
|
|
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
|
|
|
mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
}
|
|
|
|
|
|
- Thread.sleep(CONNECTION_TIMEOUT*2);
|
|
|
-
|
|
|
- // make sure servers 0, 1 don't come online
|
|
|
+ Thread.sleep(CONNECTION_TIMEOUT * 2);
|
|
|
+
|
|
|
+ // make sure servers 0, 1 don't come online - this should be the case
|
|
|
+ // since they can't complete the reconfig
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
- Assert.assertFalse("server " + i + " is up but shouldn't be",
|
|
|
+ Assert.assertFalse("server " + i + " is up but shouldn't be",
|
|
|
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
|
|
|
CONNECTION_TIMEOUT / 10));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
zk[i].close();
|
|
|
- }
|
|
|
- for (int i = 0; i < 2; i++) {
|
|
|
mt[i].shutdown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Reconfiguration recovery - test that old config members will join the new config
|
|
|
- * if its already active, and not try to complete the reconfiguration
|
|
|
+ * Reconfiguration recovery - test that old config members will join the new
|
|
|
+ * config if its already active, and not try to complete the reconfiguration
|
|
|
*/
|
|
|
@Test
|
|
|
public void testNextConfigAlreadyActive() throws Exception {
|
|
|
ClientBase.setupTestEnv();
|
|
|
-
|
|
|
+
|
|
|
// 2 servers in current config, 5 in next config
|
|
|
final int SERVER_COUNT = 5;
|
|
|
final int clientPorts[] = new int[SERVER_COUNT];
|
|
@@ -284,75 +313,274 @@ public class ReconfigRecoveryTest extends QuorumPeerTestBase {
|
|
|
String server;
|
|
|
|
|
|
String currentQuorumCfgSection = null, nextQuorumCfgSection;
|
|
|
-
|
|
|
+
|
|
|
ArrayList<String> allServers = new ArrayList<String>();
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- clientPorts[i] = PortAssignment.unique();
|
|
|
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
|
|
|
- ":participant;localhost:" + clientPorts[i];
|
|
|
- allServers.add(server);
|
|
|
- sb.append(server +"\n");
|
|
|
- if (i == 1) currentQuorumCfgSection = sb.toString();
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ clientPorts[i] = PortAssignment.unique();
|
|
|
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
|
|
|
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
|
|
|
+ + clientPorts[i];
|
|
|
+ allServers.add(server);
|
|
|
+ sb.append(server + "\n");
|
|
|
+ if (i == 1) currentQuorumCfgSection = sb.toString() + "version=100000000\n";
|
|
|
}
|
|
|
sb.append("version=200000000\n"); // version of current config is 100000000
|
|
|
nextQuorumCfgSection = sb.toString();
|
|
|
-
|
|
|
+
|
|
|
// lets start servers 2, 3, 4 with the new config
|
|
|
MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
mt[i] = new MainThread(i, clientPorts[i], nextQuorumCfgSection);
|
|
|
mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
}
|
|
|
- for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
+ for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
|
|
|
- CONNECTION_TIMEOUT));
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
|
|
|
+ CONNECTION_TIMEOUT));
|
|
|
}
|
|
|
|
|
|
ReconfigTest.testNormalOperation(zk[2], zk[3]);
|
|
|
|
|
|
long epoch = mt[2].main.quorumPeer.getAcceptedEpoch();
|
|
|
-
|
|
|
+
|
|
|
// Both servers 0 and 1 will have the .next config file, which means
|
|
|
// for them that a reconfiguration was in progress when they failed
|
|
|
- // and the leader will complete it.
|
|
|
+ // and the leader will complete it.
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
- mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
|
|
|
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
|
|
|
mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- // servers 0 and 1 should connect to all servers, including the one in their
|
|
|
- // .next file during startup, and will find the next config and join it
|
|
|
+ // servers 0 and 1 should connect to all servers, including the one in
|
|
|
+ // their .next file during startup, and will find the next config and join it
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
- Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
+ Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
|
|
|
- CONNECTION_TIMEOUT*2));
|
|
|
+ CONNECTION_TIMEOUT * 2));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// make sure they joined the new config without any change to it
|
|
|
Assert.assertEquals(epoch, mt[0].main.quorumPeer.getAcceptedEpoch());
|
|
|
Assert.assertEquals(epoch, mt[1].main.quorumPeer.getAcceptedEpoch());
|
|
|
Assert.assertEquals(epoch, mt[2].main.quorumPeer.getAcceptedEpoch());
|
|
|
-
|
|
|
+
|
|
|
ReconfigTest.testServerHasConfig(zk[0], allServers, null);
|
|
|
ReconfigTest.testServerHasConfig(zk[1], allServers, null);
|
|
|
-
|
|
|
+
|
|
|
ReconfigTest.testNormalOperation(zk[0], zk[2]);
|
|
|
ReconfigTest.testNormalOperation(zk[4], zk[1]);
|
|
|
|
|
|
-
|
|
|
for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
zk[i].close();
|
|
|
+ mt[i].shutdown();
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests conversion of observer to participant AFTER new config was already
|
|
|
+ * committed. Old config: servers 0 (participant), 1 (participant), 2
|
|
|
+ * (observer) New config: servers 2 (participant), 3 (participant) We start
|
|
|
+ * server 2 with old config and start server 3 with new config. All other
|
|
|
+ * servers are down. In order to terminate FLE, server 3 must 'convince'
|
|
|
+ * server 2 to adopt the new config and turn into a participant.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testObserverConvertedToParticipantDuringFLE() throws Exception {
|
|
|
+ ClientBase.setupTestEnv();
|
|
|
+
|
|
|
+ final int SERVER_COUNT = 4;
|
|
|
+ int[][] ports = generatePorts(SERVER_COUNT);
|
|
|
+ String currentQuorumCfgSection, nextQuorumCfgSection;
|
|
|
+
|
|
|
+ // generate old config string
|
|
|
+ HashSet<Integer> observers = new HashSet<Integer>();
|
|
|
+ observers.add(2);
|
|
|
+ StringBuilder sb = generateConfig(3, ports, observers);
|
|
|
+ sb.append("version=100000000");
|
|
|
+ currentQuorumCfgSection = sb.toString();
|
|
|
+
|
|
|
+ // generate new config string
|
|
|
+ ArrayList<String> allServersNext = new ArrayList<String>();
|
|
|
+ sb = new StringBuilder();
|
|
|
+ for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
+ String server = "server." + i + "=localhost:" + ports[i][0] + ":"
|
|
|
+ + ports[i][1] + ":participant;localhost:" + ports[i][2];
|
|
|
+ allServersNext.add(server);
|
|
|
+ sb.append(server + "\n");
|
|
|
+ }
|
|
|
+ sb.append("version=200000000"); // version of current config is 100000000
|
|
|
+ nextQuorumCfgSection = sb.toString();
|
|
|
+
|
|
|
+ MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
+
|
|
|
+ // start server 2 with old config, where it is an observer
|
|
|
+ mt[2] = new MainThread(2, ports[2][2], currentQuorumCfgSection);
|
|
|
+ mt[2].start();
|
|
|
+ zk[2] = new ZooKeeper("127.0.0.1:" + ports[2][2],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+
|
|
|
+ // start server 3 with new config
|
|
|
+ mt[3] = new MainThread(3, ports[3][2], nextQuorumCfgSection);
|
|
|
+ mt[3].start();
|
|
|
+ zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+
|
|
|
+ for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
+ Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
|
|
|
+ CONNECTION_TIMEOUT * 2));
|
|
|
+ ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(nextQuorumCfgSection,
|
|
|
+ ReconfigTest.testServerHasConfig(zk[2], null, null));
|
|
|
+ Assert.assertEquals(nextQuorumCfgSection,
|
|
|
+ ReconfigTest.testServerHasConfig(zk[3], null, null));
|
|
|
+ ReconfigTest.testNormalOperation(zk[2], zk[2]);
|
|
|
+ ReconfigTest.testNormalOperation(zk[3], zk[2]);
|
|
|
+
|
|
|
+ for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
+ zk[i].close();
|
|
|
+ mt[i].shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests conversion of observer to participant during reconfig recovery, new
|
|
|
+ * config was not committed yet. Old config: servers 0 (participant), 1
|
|
|
+ * (participant), 2 (observer) New config: servers 2 (participant), 3
|
|
|
+ * (participant) We start server servers 0, 1, 2 with old config and a .next
|
|
|
+ * file indicating a reconfig in progress. We start server 3 with old config
|
|
|
+ * + itself in config file. In this scenario server 2 can't be converted to
|
|
|
+ * participant during reconfig since we don't gossip about proposed
|
|
|
+ * configurations, only about committed ones. This tests that new config can
|
|
|
+ * be completed, which requires server 2's ack for the newleader message,
|
|
|
+ * even though its an observer.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testCurrentObserverIsParticipantInNewConfig() throws Exception {
|
|
|
+ ClientBase.setupTestEnv();
|
|
|
+
|
|
|
+ final int SERVER_COUNT = 4;
|
|
|
+ int[][] ports = generatePorts(SERVER_COUNT);
|
|
|
+ String currentQuorumCfg, currentQuorumCfgSection, nextQuorumCfgSection;
|
|
|
+
|
|
|
+ // generate old config string
|
|
|
+ HashSet<Integer> observers = new HashSet<Integer>();
|
|
|
+ observers.add(2);
|
|
|
+
|
|
|
+ StringBuilder sb = generateConfig(3, ports, observers);
|
|
|
+ currentQuorumCfg = sb.toString();
|
|
|
+ sb.append("version=100000000");
|
|
|
+ currentQuorumCfgSection = sb.toString();
|
|
|
+
|
|
|
+ // Run servers 0..2 for a while
|
|
|
+ MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
+ for (int i = 0; i <= 2; i++) {
|
|
|
+ mt[i] = new MainThread(i, ports[i][2], currentQuorumCfgSection);
|
|
|
+ mt[i].start();
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ }
|
|
|
+
|
|
|
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
|
|
|
+
|
|
|
+ for (int i = 0; i <= 2; i++) {
|
|
|
+ Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
|
|
|
+ CONNECTION_TIMEOUT * 2));
|
|
|
+ }
|
|
|
+
|
|
|
+ // shut servers 0..2 down
|
|
|
+ for (int i = 0; i <= 2; i++) {
|
|
|
+ mt[i].shutdown();
|
|
|
+ zk[i].close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // generate new config string
|
|
|
+ ArrayList<String> allServersNext = new ArrayList<String>();
|
|
|
+ sb = new StringBuilder();
|
|
|
+ for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
+ String server = "server." + i + "=localhost:" + ports[i][0] + ":"
|
|
|
+ + ports[i][1] + ":participant;localhost:" + ports[i][2];
|
|
|
+ allServersNext.add(server);
|
|
|
+ sb.append(server + "\n");
|
|
|
+ }
|
|
|
+ sb.append("version=200000000"); // version of current config is 100000000
|
|
|
+ nextQuorumCfgSection = sb.toString();
|
|
|
+
|
|
|
+ // simulate reconfig in progress - servers 0..2 have a temp reconfig
|
|
|
+ // file when they boot
|
|
|
+ for (int i = 0; i <= 2; i++) {
|
|
|
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
|
|
|
+ mt[i].start();
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ }
|
|
|
+ // new server 3 has still its invalid joiner config - everyone in old
|
|
|
+ // config + itself
|
|
|
+ mt[3] = new MainThread(3, ports[3][2], currentQuorumCfg
|
|
|
+ + allServersNext.get(1));
|
|
|
+ mt[3].start();
|
|
|
+ zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+
|
|
|
+ for (int i = 2; i < SERVER_COUNT; i++) {
|
|
|
+ Assert.assertTrue("waiting for server " + i + " being up",
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
|
|
|
+ CONNECTION_TIMEOUT * 2));
|
|
|
+ ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
|
|
|
+ ReconfigTest.testNormalOperation(zk[3], zk[1]);
|
|
|
+ Assert.assertEquals(nextQuorumCfgSection,
|
|
|
+ ReconfigTest.testServerHasConfig(zk[2], null, null));
|
|
|
+ Assert.assertEquals(nextQuorumCfgSection,
|
|
|
+ ReconfigTest.testServerHasConfig(zk[3], null, null));
|
|
|
+
|
|
|
for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ zk[i].close();
|
|
|
mt[i].shutdown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Generates 3 ports per server
|
|
|
+ */
|
|
|
+ private int[][] generatePorts(int numServers) {
|
|
|
+ int[][] ports = new int[numServers][];
|
|
|
+ for (int i = 0; i < numServers; i++) {
|
|
|
+ ports[i] = new int[3];
|
|
|
+ for (int j = 0; j < 3; j++) {
|
|
|
+ ports[i][j] = PortAssignment.unique();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return ports;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Creates a configuration string for servers 0..numServers-1 Ids in
|
|
|
+ * observerIds correspond to observers, other ids are for participants.
|
|
|
+ */
|
|
|
+ private StringBuilder generateConfig(int numServers, int[][] ports,
|
|
|
+ HashSet<Integer> observerIds) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for (int i = 0; i < numServers; i++) {
|
|
|
+ String server = "server." + i + "=localhost:" + ports[i][0] + ":"
|
|
|
+ + ports[i][1] + ":"
|
|
|
+ + (observerIds.contains(i) ? "observer" : "participant")
|
|
|
+ + ";localhost:" + ports[i][2];
|
|
|
+ sb.append(server + "\n");
|
|
|
+ }
|
|
|
+ return sb;
|
|
|
+ }
|
|
|
+}
|