|
@@ -50,8 +50,6 @@ import org.apache.zookeeper.DummyWatcher;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException.ConnectionLossException;
|
|
import org.apache.zookeeper.KeeperException.ConnectionLossException;
|
|
import org.apache.zookeeper.PortAssignment;
|
|
import org.apache.zookeeper.PortAssignment;
|
|
-import org.apache.zookeeper.WatchedEvent;
|
|
|
|
-import org.apache.zookeeper.Watcher;
|
|
|
|
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
@@ -60,10 +58,7 @@ import org.apache.zookeeper.admin.ZooKeeperAdmin;
|
|
import org.apache.zookeeper.jmx.MBeanRegistry;
|
|
import org.apache.zookeeper.jmx.MBeanRegistry;
|
|
import org.apache.zookeeper.jmx.ZKMBeanInfo;
|
|
import org.apache.zookeeper.jmx.ZKMBeanInfo;
|
|
import org.apache.zookeeper.server.admin.Commands;
|
|
import org.apache.zookeeper.server.admin.Commands;
|
|
-import org.apache.zookeeper.server.quorum.DelayRequestProcessor;
|
|
|
|
-import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer;
|
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
|
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
|
-import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
|
|
|
|
import org.apache.zookeeper.server.util.PortForwarder;
|
|
import org.apache.zookeeper.server.util.PortForwarder;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
import org.junit.runner.RunWith;
|
|
import org.junit.runner.RunWith;
|
|
@@ -72,7 +67,7 @@ import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@RunWith(Parameterized.class)
|
|
@RunWith(Parameterized.class)
|
|
-public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher {
|
|
|
|
|
|
+public class ObserverMasterTest extends ObserverMasterTestBase {
|
|
|
|
|
|
protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class);
|
|
protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class);
|
|
|
|
|
|
@@ -87,184 +82,8 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher {
|
|
|
|
|
|
private Boolean testObserverMaster;
|
|
private Boolean testObserverMaster;
|
|
|
|
|
|
- private CountDownLatch latch;
|
|
|
|
- ZooKeeper zk;
|
|
|
|
- private WatchedEvent lastEvent = null;
|
|
|
|
-
|
|
|
|
- private int CLIENT_PORT_QP1;
|
|
|
|
- private int CLIENT_PORT_QP2;
|
|
|
|
- private int CLIENT_PORT_OBS;
|
|
|
|
- private int OM_PORT;
|
|
|
|
- private MainThread q1;
|
|
|
|
- private MainThread q2;
|
|
|
|
- private MainThread q3;
|
|
|
|
-
|
|
|
|
private PortForwarder setUp(final int omProxyPort) throws IOException {
|
|
private PortForwarder setUp(final int omProxyPort) throws IOException {
|
|
- ClientBase.setupTestEnv();
|
|
|
|
-
|
|
|
|
- final int PORT_QP1 = PortAssignment.unique();
|
|
|
|
- final int PORT_QP2 = PortAssignment.unique();
|
|
|
|
- final int PORT_OBS = PortAssignment.unique();
|
|
|
|
- final int PORT_QP_LE1 = PortAssignment.unique();
|
|
|
|
- final int PORT_QP_LE2 = PortAssignment.unique();
|
|
|
|
- final int PORT_OBS_LE = PortAssignment.unique();
|
|
|
|
-
|
|
|
|
- CLIENT_PORT_QP1 = PortAssignment.unique();
|
|
|
|
- CLIENT_PORT_QP2 = PortAssignment.unique();
|
|
|
|
- CLIENT_PORT_OBS = PortAssignment.unique();
|
|
|
|
-
|
|
|
|
- OM_PORT = PortAssignment.unique();
|
|
|
|
-
|
|
|
|
- String quorumCfgSection = "server.1=127.0.0.1:" + (PORT_QP1) + ":" + (PORT_QP_LE1) + ";" + CLIENT_PORT_QP1
|
|
|
|
- + "\nserver.2=127.0.0.1:" + (PORT_QP2) + ":" + (PORT_QP_LE2) + ";" + CLIENT_PORT_QP2
|
|
|
|
- + "\nserver.3=127.0.0.1:" + (PORT_OBS) + ":" + (PORT_OBS_LE) + ":observer" + ";" + CLIENT_PORT_OBS;
|
|
|
|
- String extraCfgs = testObserverMaster
|
|
|
|
- ? String.format("observerMasterPort=%d%n", OM_PORT)
|
|
|
|
- : "";
|
|
|
|
- String extraCfgsObs = testObserverMaster
|
|
|
|
- ? String.format("observerMasterPort=%d%n", omProxyPort <= 0 ? OM_PORT : omProxyPort)
|
|
|
|
- : "";
|
|
|
|
-
|
|
|
|
- PortForwarder forwarder = null;
|
|
|
|
- if (testObserverMaster && omProxyPort >= 0) {
|
|
|
|
- forwarder = new PortForwarder(omProxyPort, OM_PORT);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, extraCfgs);
|
|
|
|
- q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, extraCfgs);
|
|
|
|
- q3 = new MainThread(3, CLIENT_PORT_OBS, quorumCfgSection, extraCfgsObs);
|
|
|
|
- q1.start();
|
|
|
|
- q2.start();
|
|
|
|
- assertTrue(
|
|
|
|
- "waiting for server 1 being up",
|
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
|
|
|
|
- assertTrue(
|
|
|
|
- "waiting for server 2 being up",
|
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
|
|
|
|
- return forwarder;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void shutdown() throws InterruptedException {
|
|
|
|
- LOG.info("Shutting down all servers");
|
|
|
|
- zk.close();
|
|
|
|
-
|
|
|
|
- q1.shutdown();
|
|
|
|
- q2.shutdown();
|
|
|
|
- q3.shutdown();
|
|
|
|
-
|
|
|
|
- assertTrue(
|
|
|
|
- "Waiting for server 1 to shut down",
|
|
|
|
- ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
|
|
|
|
- assertTrue(
|
|
|
|
- "Waiting for server 2 to shut down",
|
|
|
|
- ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
|
|
|
|
- assertTrue(
|
|
|
|
- "Waiting for server 3 to shut down",
|
|
|
|
- ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test
|
|
|
|
- public void testLaggingObserverMaster() throws Exception {
|
|
|
|
- final int OM_PROXY_PORT = PortAssignment.unique();
|
|
|
|
- PortForwarder forwarder = setUp(OM_PROXY_PORT);
|
|
|
|
-
|
|
|
|
- // find the leader and observer master
|
|
|
|
- int leaderPort;
|
|
|
|
- MainThread leader;
|
|
|
|
- MainThread follower;
|
|
|
|
- if (q1.getQuorumPeer().leader != null) {
|
|
|
|
- leaderPort = CLIENT_PORT_QP1;
|
|
|
|
- leader = q1;
|
|
|
|
- follower = q2;
|
|
|
|
- } else if (q2.getQuorumPeer().leader != null) {
|
|
|
|
- leaderPort = CLIENT_PORT_QP2;
|
|
|
|
- leader = q2;
|
|
|
|
- follower = q1;
|
|
|
|
- } else {
|
|
|
|
- throw new RuntimeException("No leader");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // ensure the observer master has commits in the queue before observer sync
|
|
|
|
- zk = new ZooKeeper("127.0.0.1:" + leaderPort, ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
- for (int i = 0; i < 10; i++) {
|
|
|
|
- zk.create("/bulk" + i, ("initial data of some size").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
- }
|
|
|
|
- zk.close();
|
|
|
|
-
|
|
|
|
- q3.start();
|
|
|
|
- assertTrue(
|
|
|
|
- "waiting for server 3 being up",
|
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT));
|
|
|
|
-
|
|
|
|
- latch = new CountDownLatch(1);
|
|
|
|
- zk = new ZooKeeper("127.0.0.1:" + leaderPort, ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
- latch.await();
|
|
|
|
- assertEquals(zk.getState(), States.CONNECTED);
|
|
|
|
-
|
|
|
|
- zk.create("/init", "first".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
- final long lastLoggedZxid = leader.getQuorumPeer().getLastLoggedZxid();
|
|
|
|
-
|
|
|
|
- // wait for change to propagate
|
|
|
|
- waitFor("Timeout waiting for observer sync", new WaitForCondition() {
|
|
|
|
- public boolean evaluate() {
|
|
|
|
- return lastLoggedZxid == q3.getQuorumPeer().getLastLoggedZxid();
|
|
|
|
- }
|
|
|
|
- }, 30);
|
|
|
|
-
|
|
|
|
- // simulate network fault
|
|
|
|
- if (forwarder != null) {
|
|
|
|
- forwarder.shutdown();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < 10; i++) {
|
|
|
|
- zk.create("/basic" + i, "second".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- DelayRequestProcessor delayRequestProcessor = null;
|
|
|
|
- if (testObserverMaster) {
|
|
|
|
- FollowerZooKeeperServer followerZooKeeperServer = (FollowerZooKeeperServer) follower.getQuorumPeer().getActiveServer();
|
|
|
|
- delayRequestProcessor = DelayRequestProcessor.injectDelayRequestProcessor(followerZooKeeperServer);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- zk.create("/target1", "third".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
- zk.create("/target2", "third".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
-
|
|
|
|
- LOG.info(
|
|
|
|
- "observer zxid {}{} leader zxid {}",
|
|
|
|
- Long.toHexString(q3.getQuorumPeer().getLastLoggedZxid()),
|
|
|
|
- (testObserverMaster ? "" : " observer master zxid " + Long.toHexString(follower.getQuorumPeer().getLastLoggedZxid())),
|
|
|
|
- Long.toHexString(leader.getQuorumPeer().getLastLoggedZxid()));
|
|
|
|
-
|
|
|
|
- // restore network
|
|
|
|
- forwarder = testObserverMaster ? new PortForwarder(OM_PROXY_PORT, OM_PORT) : null;
|
|
|
|
-
|
|
|
|
- assertTrue(
|
|
|
|
- "waiting for server 3 being up",
|
|
|
|
- ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_OBS, CONNECTION_TIMEOUT));
|
|
|
|
- assertNotNull("Leader switched", leader.getQuorumPeer().leader);
|
|
|
|
-
|
|
|
|
- if (delayRequestProcessor != null) {
|
|
|
|
- delayRequestProcessor.unblockQueue();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- latch = new CountDownLatch(1);
|
|
|
|
- ZooKeeper obsZk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
- latch.await();
|
|
|
|
- zk.create("/finalop", "fourth".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
-
|
|
|
|
- assertEquals("first", new String(obsZk.getData("/init", null, null)));
|
|
|
|
- assertEquals("third", new String(obsZk.getData("/target1", null, null)));
|
|
|
|
-
|
|
|
|
- obsZk.close();
|
|
|
|
- shutdown();
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- if (forwarder != null) {
|
|
|
|
- forwarder.shutdown();
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- // ignore
|
|
|
|
- }
|
|
|
|
|
|
+ return setUp(omProxyPort, testObserverMaster);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -678,17 +497,6 @@ public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher {
|
|
s1.shutdown();
|
|
s1.shutdown();
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Implementation of watcher interface.
|
|
|
|
- */
|
|
|
|
- public void process(WatchedEvent event) {
|
|
|
|
- lastEvent = event;
|
|
|
|
- if (latch != null) {
|
|
|
|
- latch.countDown();
|
|
|
|
- }
|
|
|
|
- LOG.info("Latch got event :: {}", event);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
class AsyncWriter implements Runnable {
|
|
class AsyncWriter implements Runnable {
|
|
|
|
|
|
private final ZooKeeper client;
|
|
private final ZooKeeper client;
|