|
@@ -34,7 +34,6 @@ import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
import java.nio.file.Paths;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -51,7 +50,6 @@ import org.apache.zookeeper.AsyncCallback;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
-import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
@@ -63,7 +61,6 @@ import org.apache.zookeeper.metrics.impl.NullMetricsProvider;
|
|
|
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
|
|
import org.apache.zookeeper.server.quorum.Leader.Proposal;
|
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
|
-import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -73,22 +70,6 @@ import org.junit.Test;
|
|
|
*/
|
|
|
public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
|
|
|
- private Servers servers;
|
|
|
- private int numServers = 0;
|
|
|
-
|
|
|
- @After
|
|
|
- public void tearDown() throws Exception {
|
|
|
- if (servers == null || servers.mt == null) {
|
|
|
- LOG.info("No servers to shutdown!");
|
|
|
- return;
|
|
|
- }
|
|
|
- for (int i = 0; i < numServers; i++) {
|
|
|
- if (i < servers.mt.length) {
|
|
|
- servers.mt[i].shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Verify the ability to start a cluster.
|
|
|
*/
|
|
@@ -454,131 +435,6 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing);
|
|
|
}
|
|
|
|
|
|
- public static void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
|
|
|
- int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
|
|
|
- while (zk.getState() != state) {
|
|
|
- if (iterations-- == 0) {
|
|
|
- throw new RuntimeException("Waiting too long " + zk.getState() + " != " + state);
|
|
|
- }
|
|
|
- Thread.sleep(500);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void waitForAll(Servers servers, States state) throws InterruptedException {
|
|
|
- waitForAll(servers.zk, state);
|
|
|
- }
|
|
|
-
|
|
|
- public static void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
|
|
|
- int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
|
|
|
- boolean someoneNotConnected = true;
|
|
|
- while (someoneNotConnected) {
|
|
|
- if (iterations-- == 0) {
|
|
|
- logStates(zks);
|
|
|
- ClientBase.logAllStackTraces();
|
|
|
- throw new RuntimeException("Waiting too long");
|
|
|
- }
|
|
|
-
|
|
|
- someoneNotConnected = false;
|
|
|
- for (ZooKeeper zk : zks) {
|
|
|
- if (zk.getState() != state) {
|
|
|
- someoneNotConnected = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- Thread.sleep(1000);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public static void logStates(ZooKeeper[] zks) {
|
|
|
- StringBuilder sbBuilder = new StringBuilder("Connection States: {");
|
|
|
- for (int i = 0; i < zks.length; i++) {
|
|
|
- sbBuilder.append(i + " : " + zks[i].getState() + ", ");
|
|
|
- }
|
|
|
- sbBuilder.append('}');
|
|
|
- LOG.error(sbBuilder.toString());
|
|
|
- }
|
|
|
-
|
|
|
- // This class holds the servers and clients for those servers
|
|
|
- private static class Servers {
|
|
|
- MainThread mt[];
|
|
|
- ZooKeeper zk[];
|
|
|
- int[] clientPorts;
|
|
|
-
|
|
|
- public void shutDownAllServers() throws InterruptedException {
|
|
|
- for (MainThread t: mt) {
|
|
|
- t.shutdown();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
|
|
|
- for (MainThread t : mt) {
|
|
|
- if (!t.isAlive()) {
|
|
|
- t.start();
|
|
|
- }
|
|
|
- }
|
|
|
- for (int i = 0; i < zk.length; i++) {
|
|
|
- restartClient(i, watcher);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void restartClient(int clientIndex, Watcher watcher) throws IOException, InterruptedException {
|
|
|
- if (zk[clientIndex] != null) {
|
|
|
- zk[clientIndex].close();
|
|
|
- }
|
|
|
- zk[clientIndex] = new ZooKeeper("127.0.0.1:" + clientPorts[clientIndex], ClientBase.CONNECTION_TIMEOUT, watcher);
|
|
|
- }
|
|
|
-
|
|
|
- public int findLeader() {
|
|
|
- for (int i = 0; i < mt.length; i++) {
|
|
|
- if (mt[i].main.quorumPeer.leader != null) {
|
|
|
- return i;
|
|
|
- }
|
|
|
- }
|
|
|
- return -1;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
|
|
|
- return LaunchServers(numServers, null);
|
|
|
- }
|
|
|
-
|
|
|
- /** * This is a helper function for launching a set of servers
|
|
|
- *
|
|
|
- * @param numServers the number of servers
|
|
|
- * @param tickTime A ticktime to pass to MainThread
|
|
|
- * @return
|
|
|
- * @throws IOException
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
|
|
|
- int SERVER_COUNT = numServers;
|
|
|
- Servers svrs = new Servers();
|
|
|
- svrs.clientPorts = new int[SERVER_COUNT];
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- svrs.clientPorts[i] = PortAssignment.unique();
|
|
|
- sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+svrs.clientPorts[i]+"\n");
|
|
|
- }
|
|
|
- String quorumCfgSection = sb.toString();
|
|
|
-
|
|
|
- svrs.mt = new MainThread[SERVER_COUNT];
|
|
|
- svrs.zk = new ZooKeeper[SERVER_COUNT];
|
|
|
- for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- if (tickTime != null) {
|
|
|
- svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
|
|
|
- } else {
|
|
|
- svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection);
|
|
|
- }
|
|
|
- svrs.mt[i].start();
|
|
|
- svrs.restartClient(i, this);
|
|
|
- }
|
|
|
-
|
|
|
- waitForAll(svrs, States.CONNECTED);
|
|
|
-
|
|
|
- return svrs;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Verify handling of bad quorum address
|
|
|
*/
|