|
@@ -37,25 +37,21 @@ import org.apache.log4j.WriterAppender;
|
|
import org.apache.zookeeper.CreateMode;
|
|
import org.apache.zookeeper.CreateMode;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.PortAssignment;
|
|
import org.apache.zookeeper.PortAssignment;
|
|
-import org.apache.zookeeper.WatchedEvent;
|
|
|
|
-import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooKeeper.States;
|
|
import org.apache.zookeeper.ZooKeeper.States;
|
|
import org.apache.zookeeper.server.quorum.Leader.Proposal;
|
|
import org.apache.zookeeper.server.quorum.Leader.Proposal;
|
|
-import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
|
|
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Test stand-alone server.
|
|
* Test stand-alone server.
|
|
*
|
|
*
|
|
*/
|
|
*/
|
|
public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
- /**
|
|
|
|
|
|
+ /**
|
|
* Verify the ability to start a cluster.
|
|
* Verify the ability to start a cluster.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
@@ -66,10 +62,10 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
final int CLIENT_PORT_QP2 = PortAssignment.unique();
|
|
final int CLIENT_PORT_QP2 = PortAssignment.unique();
|
|
|
|
|
|
String quorumCfgSection =
|
|
String quorumCfgSection =
|
|
- "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique()
|
|
|
|
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique();
|
|
|
|
|
|
+ "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique()
|
|
|
|
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique();
|
|
|
|
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
@@ -77,13 +73,12 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
q2.start();
|
|
q2.start();
|
|
|
|
|
|
Assert.assertTrue("waiting for server 1 being up",
|
|
Assert.assertTrue("waiting for server 1 being up",
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
|
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
CONNECTION_TIMEOUT));
|
|
CONNECTION_TIMEOUT));
|
|
Assert.assertTrue("waiting for server 2 being up",
|
|
Assert.assertTrue("waiting for server 2 being up",
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
|
|
|
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
|
|
CONNECTION_TIMEOUT));
|
|
CONNECTION_TIMEOUT));
|
|
|
|
|
|
-
|
|
|
|
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
ClientBase.CONNECTION_TIMEOUT, this);
|
|
ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
|
|
@@ -121,133 +116,133 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
final int SERVER_COUNT = 3;
|
|
final int SERVER_COUNT = 3;
|
|
final int clientPorts[] = new int[SERVER_COUNT];
|
|
final int clientPorts[] = new int[SERVER_COUNT];
|
|
StringBuilder sb = new StringBuilder();
|
|
StringBuilder sb = new StringBuilder();
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- clientPorts[i] = PortAssignment.unique();
|
|
|
|
- sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ clientPorts[i] = PortAssignment.unique();
|
|
|
|
+ sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\n");
|
|
}
|
|
}
|
|
String quorumCfgSection = sb.toString();
|
|
String quorumCfgSection = sb.toString();
|
|
|
|
|
|
MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
|
- mt[i].start();
|
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
|
+ mt[i].start();
|
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
waitForAll(zk, States.CONNECTED);
|
|
waitForAll(zk, States.CONNECTED);
|
|
-
|
|
|
|
|
|
+
|
|
// we need to shutdown and start back up to make sure that the create session isn't the first transaction since
|
|
// we need to shutdown and start back up to make sure that the create session isn't the first transaction since
|
|
// that is rather innocuous.
|
|
// that is rather innocuous.
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- mt[i].shutdown();
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ mt[i].shutdown();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
waitForAll(zk, States.CONNECTING);
|
|
waitForAll(zk, States.CONNECTING);
|
|
-
|
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- mt[i].start();
|
|
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ mt[i].start();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
waitForAll(zk, States.CONNECTED);
|
|
waitForAll(zk, States.CONNECTED);
|
|
-
|
|
|
|
|
|
+
|
|
// ok lets find the leader and kill everything else, we have a few
|
|
// ok lets find the leader and kill everything else, we have a few
|
|
// seconds, so it should be plenty of time
|
|
// seconds, so it should be plenty of time
|
|
int leader = -1;
|
|
int leader = -1;
|
|
Map<Long, Proposal> outstanding = null;
|
|
Map<Long, Proposal> outstanding = null;
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- if (mt[i].main.quorumPeer.leader == null) {
|
|
|
|
- mt[i].shutdown();
|
|
|
|
- } else {
|
|
|
|
- leader = i;
|
|
|
|
- outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
|
|
|
|
- }
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ if (mt[i].main.quorumPeer.leader == null) {
|
|
|
|
+ mt[i].shutdown();
|
|
|
|
+ } else {
|
|
|
|
+ leader = i;
|
|
|
|
+ outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- zk[leader].create("/zk"+leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE,
|
|
|
|
- CreateMode.PERSISTENT);
|
|
|
|
- Assert.fail("create /zk" + leader + " should have failed");
|
|
|
|
- } catch(KeeperException e) {}
|
|
|
|
-
|
|
|
|
- // just make sure that we actually did get it in process at the
|
|
|
|
|
|
+ zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE,
|
|
|
|
+ CreateMode.PERSISTENT);
|
|
|
|
+ Assert.fail("create /zk" + leader + " should have failed");
|
|
|
|
+ } catch (KeeperException e) {}
|
|
|
|
+
|
|
|
|
+ // just make sure that we actually did get it in process at the
|
|
// leader
|
|
// leader
|
|
Assert.assertTrue(outstanding.size() == 1);
|
|
Assert.assertTrue(outstanding.size() == 1);
|
|
- Assert.assertTrue(((Proposal)outstanding.values().iterator().next()).request.hdr.getType() == OpCode.create);
|
|
|
|
|
|
+ Assert.assertTrue(((Proposal) outstanding.values().iterator().next()).request.hdr.getType() == OpCode.create);
|
|
// make sure it has a chance to write it to disk
|
|
// make sure it has a chance to write it to disk
|
|
Thread.sleep(1000);
|
|
Thread.sleep(1000);
|
|
mt[leader].shutdown();
|
|
mt[leader].shutdown();
|
|
waitForAll(zk, States.CONNECTING);
|
|
waitForAll(zk, States.CONNECTING);
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- if (i != leader) {
|
|
|
|
- mt[i].start();
|
|
|
|
- }
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ if (i != leader) {
|
|
|
|
+ mt[i].start();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- if (i != leader) {
|
|
|
|
- waitForOne(zk[i], States.CONNECTED);
|
|
|
|
- zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
- }
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ if (i != leader) {
|
|
|
|
+ waitForOne(zk[i], States.CONNECTED);
|
|
|
|
+ zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
mt[leader].start();
|
|
mt[leader].start();
|
|
waitForAll(zk, States.CONNECTED);
|
|
waitForAll(zk, States.CONNECTED);
|
|
// make sure everything is consistent
|
|
// make sure everything is consistent
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- for(int j = 0; j < SERVER_COUNT; j++) {
|
|
|
|
- if (i == leader) {
|
|
|
|
- Assert.assertTrue((j==leader?("Leader ("+leader+")"):("Follower "+j))+" should not have /zk" + i, zk[j].exists("/zk"+i, false) == null);
|
|
|
|
- } else {
|
|
|
|
- Assert.assertTrue((j==leader?("Leader ("+leader+")"):("Follower "+j))+" does not have /zk" + i, zk[j].exists("/zk"+i, false) != null);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ for (int j = 0; j < SERVER_COUNT; j++) {
|
|
|
|
+ if (i == leader) {
|
|
|
|
+ Assert.assertTrue((j == leader ? ("Leader (" + leader + ")") : ("Follower " + j)) + " should not have /zk" + i, zk[j].exists("/zk" + i, false) == null);
|
|
|
|
+ } else {
|
|
|
|
+ Assert.assertTrue((j == leader ? ("Leader (" + leader + ")") : ("Follower " + j)) + " does not have /zk" + i, zk[j].exists("/zk" + i, false) != null);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- zk[i].close();
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ zk[i].close();
|
|
}
|
|
}
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- mt[i].shutdown();
|
|
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ mt[i].shutdown();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Test the case of server with highest zxid not present at leader election and joining later.
|
|
* Test the case of server with highest zxid not present at leader election and joining later.
|
|
- * This test case is for reproducing the issue and fixing the bug mentioned in ZOOKEEPER-1154
|
|
|
|
- * and ZOOKEEPER-1156.
|
|
|
|
|
|
+ * This test case is for reproducing the issue and fixing the bug mentioned in ZOOKEEPER-1154
|
|
|
|
+ * and ZOOKEEPER-1156.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
public void testHighestZxidJoinLate() throws Exception {
|
|
public void testHighestZxidJoinLate() throws Exception {
|
|
int numServers = 3;
|
|
int numServers = 3;
|
|
Servers svrs = LaunchServers(numServers);
|
|
Servers svrs = LaunchServers(numServers);
|
|
String path = "/hzxidtest";
|
|
String path = "/hzxidtest";
|
|
- int leader=-1;
|
|
|
|
|
|
+ int leader = -1;
|
|
|
|
|
|
// find the leader
|
|
// find the leader
|
|
- for (int i=0; i < numServers; i++) {
|
|
|
|
|
|
+ for (int i = 0; i < numServers; i++) {
|
|
if (svrs.mt[i].main.quorumPeer.leader != null) {
|
|
if (svrs.mt[i].main.quorumPeer.leader != null) {
|
|
leader = i;
|
|
leader = i;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
// make sure there is a leader
|
|
// make sure there is a leader
|
|
- Assert.assertTrue("There should be a leader", leader >=0);
|
|
|
|
|
|
+ Assert.assertTrue("There should be a leader", leader >= 0);
|
|
|
|
|
|
- int nonleader = (leader+1)%numServers;
|
|
|
|
|
|
+ int nonleader = (leader + 1) % numServers;
|
|
|
|
|
|
byte[] input = new byte[1];
|
|
byte[] input = new byte[1];
|
|
input[0] = 1;
|
|
input[0] = 1;
|
|
byte[] output;
|
|
byte[] output;
|
|
|
|
|
|
// Create a couple of nodes
|
|
// Create a couple of nodes
|
|
- svrs.zk[leader].create(path+leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
- svrs.zk[leader].create(path+nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
-
|
|
|
|
|
|
+ svrs.zk[leader].create(path + leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
+ svrs.zk[leader].create(path + nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
+
|
|
// make sure the updates indeed committed. If it is not
|
|
// make sure the updates indeed committed. If it is not
|
|
// the following statement will throw.
|
|
// the following statement will throw.
|
|
- output = svrs.zk[leader].getData(path+nonleader, false, null);
|
|
|
|
-
|
|
|
|
|
|
+ output = svrs.zk[leader].getData(path + nonleader, false, null);
|
|
|
|
+
|
|
// Shutdown every one else but the leader
|
|
// Shutdown every one else but the leader
|
|
- for (int i=0; i < numServers; i++) {
|
|
|
|
|
|
+ for (int i = 0; i < numServers; i++) {
|
|
if (i != leader) {
|
|
if (i != leader) {
|
|
svrs.mt[i].shutdown();
|
|
svrs.mt[i].shutdown();
|
|
}
|
|
}
|
|
@@ -256,8 +251,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
input[0] = 2;
|
|
input[0] = 2;
|
|
|
|
|
|
// Update the node on the leader
|
|
// Update the node on the leader
|
|
- svrs.zk[leader].setData(path+leader, input, -1, null, null);
|
|
|
|
-
|
|
|
|
|
|
+ svrs.zk[leader].setData(path + leader, input, -1, null, null);
|
|
|
|
+
|
|
// wait some time to let this get written to disk
|
|
// wait some time to let this get written to disk
|
|
Thread.sleep(500);
|
|
Thread.sleep(500);
|
|
|
|
|
|
@@ -269,7 +264,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
waitForAll(svrs.zk, States.CONNECTING);
|
|
waitForAll(svrs.zk, States.CONNECTING);
|
|
|
|
|
|
// Start everyone but the leader
|
|
// Start everyone but the leader
|
|
- for (int i=0; i < numServers; i++) {
|
|
|
|
|
|
+ for (int i = 0; i < numServers; i++) {
|
|
if (i != leader) {
|
|
if (i != leader) {
|
|
svrs.mt[i].start();
|
|
svrs.mt[i].start();
|
|
}
|
|
}
|
|
@@ -279,7 +274,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
waitForOne(svrs.zk[nonleader], States.CONNECTED);
|
|
waitForOne(svrs.zk[nonleader], States.CONNECTED);
|
|
|
|
|
|
// validate that the old value is there and not the new one
|
|
// validate that the old value is there and not the new one
|
|
- output = svrs.zk[nonleader].getData(path+leader, false, null);
|
|
|
|
|
|
+ output = svrs.zk[nonleader].getData(path + leader, false, null);
|
|
|
|
|
|
Assert.assertEquals(
|
|
Assert.assertEquals(
|
|
"Expecting old value 1 since 2 isn't committed yet",
|
|
"Expecting old value 1 since 2 isn't committed yet",
|
|
@@ -287,91 +282,90 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
|
|
|
// Do some other update, so we bump the maxCommttedZxid
|
|
// Do some other update, so we bump the maxCommttedZxid
|
|
// by setting the value to 2
|
|
// by setting the value to 2
|
|
- svrs.zk[nonleader].setData(path+nonleader, input, -1);
|
|
|
|
|
|
+ svrs.zk[nonleader].setData(path + nonleader, input, -1);
|
|
|
|
|
|
- // start the old leader
|
|
|
|
|
|
+ // start the old leader
|
|
svrs.mt[leader].start();
|
|
svrs.mt[leader].start();
|
|
|
|
|
|
// connect to it
|
|
// connect to it
|
|
waitForOne(svrs.zk[leader], States.CONNECTED);
|
|
waitForOne(svrs.zk[leader], States.CONNECTED);
|
|
|
|
|
|
// make sure it doesn't have the new value that it alone had logged
|
|
// make sure it doesn't have the new value that it alone had logged
|
|
- output = svrs.zk[leader].getData(path+leader, false, null);
|
|
|
|
|
|
+ output = svrs.zk[leader].getData(path + leader, false, null);
|
|
Assert.assertEquals(
|
|
Assert.assertEquals(
|
|
"Validating that the deposed leader has rolled back that change it had written",
|
|
"Validating that the deposed leader has rolled back that change it had written",
|
|
output[0], 1);
|
|
output[0], 1);
|
|
-
|
|
|
|
|
|
+
|
|
// make sure the leader has the subsequent changes that were made while it was offline
|
|
// make sure the leader has the subsequent changes that were made while it was offline
|
|
- output = svrs.zk[leader].getData(path+nonleader, false, null);
|
|
|
|
|
|
+ output = svrs.zk[leader].getData(path + nonleader, false, null);
|
|
Assert.assertEquals(
|
|
Assert.assertEquals(
|
|
"Validating that the deposed leader caught up on changes it missed",
|
|
"Validating that the deposed leader caught up on changes it missed",
|
|
output[0], 2);
|
|
output[0], 2);
|
|
}
|
|
}
|
|
|
|
|
|
private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
|
|
private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
|
|
- while(zk.getState() != state) {
|
|
|
|
- Thread.sleep(500);
|
|
|
|
- }
|
|
|
|
|
|
+ while (zk.getState() != state) {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
- private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
|
|
|
|
- int iterations = 10;
|
|
|
|
- boolean someoneNotConnected = true;
|
|
|
|
- while(someoneNotConnected) {
|
|
|
|
- if (iterations-- == 0) {
|
|
|
|
- throw new RuntimeException("Waiting too long");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- someoneNotConnected = false;
|
|
|
|
- for(ZooKeeper zk: zks) {
|
|
|
|
- if (zk.getState() != state) {
|
|
|
|
- someoneNotConnected = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- Thread.sleep(1000);
|
|
|
|
|
|
+
|
|
|
|
+ private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
|
|
|
|
+ int iterations = 10;
|
|
|
|
+ boolean someoneNotConnected = true;
|
|
|
|
+ while (someoneNotConnected) {
|
|
|
|
+ if (iterations-- == 0) {
|
|
|
|
+ throw new RuntimeException("Waiting too long");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ someoneNotConnected = false;
|
|
|
|
+ for (ZooKeeper zk : zks) {
|
|
|
|
+ if (zk.getState() != state) {
|
|
|
|
+ someoneNotConnected = true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ Thread.sleep(1000);
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
// This class holds the servers and clients for those servers
|
|
// This class holds the servers and clients for those servers
|
|
- private class Servers {
|
|
|
|
- MainThread mt[];
|
|
|
|
- ZooKeeper zk[];
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * This is a helper function for launching a set of servers
|
|
|
|
- *
|
|
|
|
- * @param numServers
|
|
|
|
- * @return
|
|
|
|
- * @throws IOException
|
|
|
|
- * @throws InterruptedException
|
|
|
|
- */
|
|
|
|
- private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
|
|
|
|
- int SERVER_COUNT = numServers;
|
|
|
|
- Servers svrs = new Servers();
|
|
|
|
- final int clientPorts[] = new int[SERVER_COUNT];
|
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- clientPorts[i] = PortAssignment.unique();
|
|
|
|
- sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
|
|
|
|
- }
|
|
|
|
- String quorumCfgSection = sb.toString();
|
|
|
|
-
|
|
|
|
- MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
|
- ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
|
- mt[i].start();
|
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- waitForAll(zk, States.CONNECTED);
|
|
|
|
-
|
|
|
|
- svrs.mt = mt;
|
|
|
|
- svrs.zk = zk;
|
|
|
|
- return svrs;
|
|
|
|
- }
|
|
|
|
|
|
+ private static class Servers {
|
|
|
|
+ MainThread mt[];
|
|
|
|
+ ZooKeeper zk[];
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * This is a helper function for launching a set of servers
|
|
|
|
+ *
|
|
|
|
+ * @param numServers
|
|
|
|
+ * @return
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
|
|
|
|
+ int SERVER_COUNT = numServers;
|
|
|
|
+ Servers svrs = new Servers();
|
|
|
|
+ final int clientPorts[] = new int[SERVER_COUNT];
|
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ clientPorts[i] = PortAssignment.unique();
|
|
|
|
+ sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\n");
|
|
|
|
+ }
|
|
|
|
+ String quorumCfgSection = sb.toString();
|
|
|
|
+
|
|
|
|
+ MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
|
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
|
+ for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
|
+ mt[i].start();
|
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ waitForAll(zk, States.CONNECTED);
|
|
|
|
+
|
|
|
|
+ svrs.mt = mt;
|
|
|
|
+ svrs.zk = zk;
|
|
|
|
+ return svrs;
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Verify handling of bad quorum address
|
|
* Verify handling of bad quorum address
|
|
@@ -382,7 +376,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
|
|
|
// setup the logger to capture all logs
|
|
// setup the logger to capture all logs
|
|
Layout layout =
|
|
Layout layout =
|
|
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
|
|
|
|
+ Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
WriterAppender appender = new WriterAppender(layout, os);
|
|
WriterAppender appender = new WriterAppender(layout, os);
|
|
appender.setThreshold(Level.WARN);
|
|
appender.setThreshold(Level.WARN);
|
|
@@ -394,17 +388,17 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
final int CLIENT_PORT_QP2 = PortAssignment.unique();
|
|
final int CLIENT_PORT_QP2 = PortAssignment.unique();
|
|
|
|
|
|
String quorumCfgSection =
|
|
String quorumCfgSection =
|
|
- "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique()
|
|
|
|
- + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique();
|
|
|
|
|
|
+ "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique()
|
|
|
|
+ + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique();
|
|
|
|
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
q1.start();
|
|
q1.start();
|
|
|
|
|
|
boolean isup =
|
|
boolean isup =
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
|
|
- 5000);
|
|
|
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
|
|
+ 5000);
|
|
|
|
|
|
Assert.assertFalse("Server never came up", isup);
|
|
Assert.assertFalse("Server never came up", isup);
|
|
|
|
|
|
@@ -422,7 +416,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
String line;
|
|
String line;
|
|
boolean found = false;
|
|
boolean found = false;
|
|
Pattern p =
|
|
Pattern p =
|
|
- Pattern.compile(".*Cannot open channel to .* at election address .*");
|
|
|
|
|
|
+ Pattern.compile(".*Cannot open channel to .* at election address .*");
|
|
while ((line = r.readLine()) != null) {
|
|
while ((line = r.readLine()) != null) {
|
|
found = p.matcher(line).matches();
|
|
found = p.matcher(line).matches();
|
|
if (found) {
|
|
if (found) {
|
|
@@ -441,14 +435,14 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
|
|
|
// setup the logger to capture all logs
|
|
// setup the logger to capture all logs
|
|
Layout layout =
|
|
Layout layout =
|
|
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
|
|
|
|
+ Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
WriterAppender appender = new WriterAppender(layout, os);
|
|
WriterAppender appender = new WriterAppender(layout, os);
|
|
appender.setThreshold(Level.INFO);
|
|
appender.setThreshold(Level.INFO);
|
|
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
|
|
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
|
|
qlogger.addAppender(appender);
|
|
qlogger.addAppender(appender);
|
|
|
|
|
|
- // test the most likely situation only: server is stated as observer in
|
|
|
|
|
|
+ // test the most likely situation only: server is stated as observer in
|
|
// servers list, but there's no "peerType=observer" token in config
|
|
// servers list, but there's no "peerType=observer" token in config
|
|
try {
|
|
try {
|
|
final int CLIENT_PORT_QP1 = PortAssignment.unique();
|
|
final int CLIENT_PORT_QP1 = PortAssignment.unique();
|
|
@@ -456,12 +450,12 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
final int CLIENT_PORT_QP3 = PortAssignment.unique();
|
|
final int CLIENT_PORT_QP3 = PortAssignment.unique();
|
|
|
|
|
|
String quorumCfgSection =
|
|
String quorumCfgSection =
|
|
- "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique()
|
|
|
|
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique()
|
|
|
|
- + "\nserver.3=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique() + ":observer";
|
|
|
|
|
|
+ "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique()
|
|
|
|
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique()
|
|
|
|
+ + "\nserver.3=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique() + ":observer";
|
|
|
|
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
@@ -503,10 +497,10 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
boolean warningPresent = false;
|
|
boolean warningPresent = false;
|
|
boolean defaultedToObserver = false;
|
|
boolean defaultedToObserver = false;
|
|
Pattern pWarn =
|
|
Pattern pWarn =
|
|
- Pattern.compile(".*Peer type from servers list.* doesn't match peerType.*");
|
|
|
|
|
|
+ Pattern.compile(".*Peer type from servers list.* doesn't match peerType.*");
|
|
Pattern pObserve = Pattern.compile(".*OBSERVING.*");
|
|
Pattern pObserve = Pattern.compile(".*OBSERVING.*");
|
|
while ((line = r.readLine()) != null) {
|
|
while ((line = r.readLine()) != null) {
|
|
- if (pWarn.matcher(line).matches()) {
|
|
|
|
|
|
+ if (pWarn.matcher(line).matches()) {
|
|
warningPresent = true;
|
|
warningPresent = true;
|
|
}
|
|
}
|
|
if (pObserve.matcher(line).matches()) {
|
|
if (pObserve.matcher(line).matches()) {
|
|
@@ -516,12 +510,12 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- Assert.assertTrue("Should warn about inconsistent peer type",
|
|
|
|
|
|
+ Assert.assertTrue("Should warn about inconsistent peer type",
|
|
warningPresent && defaultedToObserver);
|
|
warningPresent && defaultedToObserver);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * verify if bad packets are being handled properly
|
|
|
|
|
|
+ * verify if bad packets are being handled properly
|
|
* at the quorum port
|
|
* at the quorum port
|
|
* @throws Exception
|
|
* @throws Exception
|
|
*/
|
|
*/
|
|
@@ -533,25 +527,25 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
int electionPort1 = PortAssignment.unique();
|
|
int electionPort1 = PortAssignment.unique();
|
|
int electionPort2 = PortAssignment.unique();
|
|
int electionPort2 = PortAssignment.unique();
|
|
String quorumCfgSection =
|
|
String quorumCfgSection =
|
|
- "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + electionPort1
|
|
|
|
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + electionPort2;
|
|
|
|
-
|
|
|
|
|
|
+ "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + electionPort1
|
|
|
|
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + electionPort2;
|
|
|
|
+
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
q1.start();
|
|
q1.start();
|
|
q2.start();
|
|
q2.start();
|
|
-
|
|
|
|
|
|
+
|
|
Assert.assertTrue("waiting for server 1 being up",
|
|
Assert.assertTrue("waiting for server 1 being up",
|
|
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
CONNECTION_TIMEOUT));
|
|
CONNECTION_TIMEOUT));
|
|
Assert.assertTrue("waiting for server 2 being up",
|
|
Assert.assertTrue("waiting for server 2 being up",
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
|
|
|
|
- CONNECTION_TIMEOUT));
|
|
|
|
-
|
|
|
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
|
|
|
|
+ CONNECTION_TIMEOUT));
|
|
|
|
+
|
|
byte[] b = new byte[4];
|
|
byte[] b = new byte[4];
|
|
- int length = 1024*1024*1024;
|
|
|
|
|
|
+ int length = 1024 * 1024 * 1024;
|
|
ByteBuffer buff = ByteBuffer.wrap(b);
|
|
ByteBuffer buff = ByteBuffer.wrap(b);
|
|
buff.putInt(length);
|
|
buff.putInt(length);
|
|
buff.position(0);
|
|
buff.position(0);
|
|
@@ -562,7 +556,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort2));
|
|
s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort2));
|
|
s.write(buff);
|
|
s.write(buff);
|
|
s.close();
|
|
s.close();
|
|
-
|
|
|
|
|
|
+
|
|
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1,
|
|
ClientBase.CONNECTION_TIMEOUT, this);
|
|
ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
|
|
@@ -574,7 +568,6 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
q2.shutdown();
|
|
q2.shutdown();
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Verify handling of quorum defaults
|
|
* Verify handling of quorum defaults
|
|
* * default electionAlg is fast leader election
|
|
* * default electionAlg is fast leader election
|
|
@@ -585,7 +578,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
|
|
|
// setup the logger to capture all logs
|
|
// setup the logger to capture all logs
|
|
Layout layout =
|
|
Layout layout =
|
|
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
|
|
|
|
+ Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
WriterAppender appender = new WriterAppender(layout, os);
|
|
WriterAppender appender = new WriterAppender(layout, os);
|
|
appender.setImmediateFlush(true);
|
|
appender.setImmediateFlush(true);
|
|
@@ -598,10 +591,10 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
final int CLIENT_PORT_QP2 = PortAssignment.unique();
|
|
final int CLIENT_PORT_QP2 = PortAssignment.unique();
|
|
|
|
|
|
String quorumCfgSection =
|
|
String quorumCfgSection =
|
|
- "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique()
|
|
|
|
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique();
|
|
|
|
|
|
+ "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique()
|
|
|
|
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique();
|
|
|
|
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
|
|
@@ -633,7 +626,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
String line;
|
|
String line;
|
|
boolean found = false;
|
|
boolean found = false;
|
|
Pattern p =
|
|
Pattern p =
|
|
- Pattern.compile(".*FastLeaderElection.*");
|
|
|
|
|
|
+ Pattern.compile(".*FastLeaderElection.*");
|
|
while ((line = r.readLine()) != null) {
|
|
while ((line = r.readLine()) != null) {
|
|
found = p.matcher(line).matches();
|
|
found = p.matcher(line).matches();
|
|
if (found) {
|
|
if (found) {
|
|
@@ -651,10 +644,10 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
long maxwait = 3000;
|
|
long maxwait = 3000;
|
|
final int CLIENT_PORT_QP1 = PortAssignment.unique();
|
|
final int CLIENT_PORT_QP1 = PortAssignment.unique();
|
|
String quorumCfgSection =
|
|
String quorumCfgSection =
|
|
- "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique()
|
|
|
|
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
- + ":" + PortAssignment.unique();
|
|
|
|
|
|
+ "server.1=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique()
|
|
|
|
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
|
|
|
|
+ + ":" + PortAssignment.unique();
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
|
|
q1.start();
|
|
q1.start();
|
|
// Let the notifications timeout
|
|
// Let the notifications timeout
|
|
@@ -663,7 +656,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
q1.shutdown();
|
|
q1.shutdown();
|
|
long end = System.currentTimeMillis();
|
|
long end = System.currentTimeMillis();
|
|
if ((end - start) > maxwait) {
|
|
if ((end - start) > maxwait) {
|
|
- Assert.fail("QuorumPeer took " + (end -start) +
|
|
|
|
|
|
+ Assert.fail("QuorumPeer took " + (end - start) +
|
|
" to shutdown, expected " + maxwait);
|
|
" to shutdown, expected " + maxwait);
|
|
}
|
|
}
|
|
}
|
|
}
|