|
@@ -1,5 +1,4 @@
|
|
-/**
|
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
|
|
+/* Licensed to the Apache Software Foundation (ASF) under one
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* or more contributor license agreements. See the NOTICE file
|
|
* distributed with this work for additional information
|
|
* distributed with this work for additional information
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
@@ -17,172 +16,257 @@
|
|
*/
|
|
*/
|
|
|
|
|
|
package org.apache.zookeeper.test;
|
|
package org.apache.zookeeper.test;
|
|
|
|
+
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.File;
|
|
import java.io.File;
|
|
|
|
+import java.io.IOException;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
-import java.util.ArrayList;
|
|
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
+import java.util.LinkedHashSet;
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
-import java.util.Random;
|
|
|
|
-
|
|
|
|
-import junit.framework.TestCase;
|
|
|
|
|
|
+import java.util.Set;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
import org.apache.log4j.Logger;
|
|
import org.apache.zookeeper.PortAssignment;
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
|
+import org.apache.zookeeper.ZooKeeper;
|
|
import org.apache.zookeeper.server.quorum.FastLeaderElection;
|
|
import org.apache.zookeeper.server.quorum.FastLeaderElection;
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
|
-import org.apache.zookeeper.server.quorum.Vote;
|
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
-import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
|
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
|
|
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
|
|
|
|
+import org.junit.After;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
-public class HierarchicalQuorumTest extends TestCase {
|
|
|
|
- private static final Logger LOG = Logger.getLogger(HierarchicalQuorumTest.class);
|
|
|
|
-
|
|
|
|
- Properties qp;
|
|
|
|
|
|
+/**
|
|
|
|
+ * Comprehensive test of hierarchical quorums, assuming servers with zero weight.
|
|
|
|
+ * This test uses ClientTest to verify that the ensemble works after a leader is
|
|
|
|
+ * elected.
|
|
|
|
+ *
|
|
|
|
+ * This implementation is based on QuorumBase, the main difference being that it
|
|
|
|
+ * uses hierarchical quorums and FLE.
|
|
|
|
+ */
|
|
|
|
|
|
- int count;
|
|
|
|
- HashMap<Long,QuorumServer> peers;
|
|
|
|
- ArrayList<LEThread> threads;
|
|
|
|
- File tmpdir[];
|
|
|
|
- int port[];
|
|
|
|
- Object finalObj;
|
|
|
|
|
|
+public class HierarchicalQuorumTest extends ClientBase {
|
|
|
|
+ private static final Logger LOG = Logger.getLogger(QuorumBase.class);
|
|
|
|
|
|
- volatile Vote votes[];
|
|
|
|
- volatile boolean leaderDies;
|
|
|
|
- volatile long leader = -1;
|
|
|
|
- Random rand = new Random();
|
|
|
|
|
|
+ File s1dir, s2dir, s3dir, s4dir, s5dir;
|
|
|
|
+ QuorumPeer s1, s2, s3, s4, s5;
|
|
|
|
+ private int port1;
|
|
|
|
+ private int port2;
|
|
|
|
+ private int port3;
|
|
|
|
+ private int port4;
|
|
|
|
+ private int port5;
|
|
|
|
|
|
|
|
+ private int leport1;
|
|
|
|
+ private int leport2;
|
|
|
|
+ private int leport3;
|
|
|
|
+ private int leport4;
|
|
|
|
+ private int leport5;
|
|
|
|
|
|
- @Before
|
|
|
|
|
|
+ Properties qp;
|
|
|
|
+ private final ClientTest ct = new ClientTest();
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
protected void setUp() throws Exception {
|
|
protected void setUp() throws Exception {
|
|
- count = 9;
|
|
|
|
-
|
|
|
|
- peers = new HashMap<Long,QuorumServer>(count);
|
|
|
|
- threads = new ArrayList<LEThread>(count);
|
|
|
|
- votes = new Vote[count];
|
|
|
|
- tmpdir = new File[count];
|
|
|
|
- port = new int[count];
|
|
|
|
- finalObj = new Object();
|
|
|
|
-
|
|
|
|
- String config = "group.1=0:1:2\n" +
|
|
|
|
- "group.2=3:4:5\n" +
|
|
|
|
- "group.3=6:7:8\n\n" +
|
|
|
|
- "weight.0=1\n" +
|
|
|
|
|
|
+ LOG.info("STARTING " + getName());
|
|
|
|
+ setupTestEnv();
|
|
|
|
+
|
|
|
|
+ JMXEnv.setUp();
|
|
|
|
+
|
|
|
|
+ setUpAll();
|
|
|
|
+
|
|
|
|
+ port1 = PortAssignment.unique();
|
|
|
|
+ port2 = PortAssignment.unique();
|
|
|
|
+ port3 = PortAssignment.unique();
|
|
|
|
+ port4 = PortAssignment.unique();
|
|
|
|
+ port5 = PortAssignment.unique();
|
|
|
|
+ leport1 = PortAssignment.unique();
|
|
|
|
+ leport2 = PortAssignment.unique();
|
|
|
|
+ leport3 = PortAssignment.unique();
|
|
|
|
+ leport4 = PortAssignment.unique();
|
|
|
|
+ leport5 = PortAssignment.unique();
|
|
|
|
+
|
|
|
|
+ hostPort = "127.0.0.1:" + port1
|
|
|
|
+ + ",127.0.0.1:" + port2
|
|
|
|
+ + ",127.0.0.1:" + port3
|
|
|
|
+ + ",127.0.0.1:" + port4
|
|
|
|
+ + ",127.0.0.1:" + port5;
|
|
|
|
+ LOG.info("Ports are: " + hostPort);
|
|
|
|
+
|
|
|
|
+ s1dir = ClientBase.createTmpDir();
|
|
|
|
+ s2dir = ClientBase.createTmpDir();
|
|
|
|
+ s3dir = ClientBase.createTmpDir();
|
|
|
|
+ s4dir = ClientBase.createTmpDir();
|
|
|
|
+ s5dir = ClientBase.createTmpDir();
|
|
|
|
+
|
|
|
|
+ String config = "group.1=1:2:3\n" +
|
|
|
|
+ "group.2=4:5\n" +
|
|
"weight.1=1\n" +
|
|
"weight.1=1\n" +
|
|
"weight.2=1\n" +
|
|
"weight.2=1\n" +
|
|
"weight.3=1\n" +
|
|
"weight.3=1\n" +
|
|
- "weight.4=1\n" +
|
|
|
|
- "weight.5=1\n" +
|
|
|
|
- "weight.6=1\n" +
|
|
|
|
- "weight.7=1\n" +
|
|
|
|
- "weight.8=1";
|
|
|
|
|
|
+ "weight.4=0\n" +
|
|
|
|
+ "weight.5=0\n";
|
|
|
|
|
|
ByteArrayInputStream is = new ByteArrayInputStream(config.getBytes());
|
|
ByteArrayInputStream is = new ByteArrayInputStream(config.getBytes());
|
|
this.qp = new Properties();
|
|
this.qp = new Properties();
|
|
|
|
+
|
|
qp.load(is);
|
|
qp.load(is);
|
|
|
|
+ startServers();
|
|
|
|
|
|
- LOG.info("SetUp " + getName());
|
|
|
|
|
|
+ ct.hostPort = hostPort;
|
|
|
|
+ ct.setUpAll();
|
|
|
|
+
|
|
|
|
+ LOG.info("Setup finished");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ void startServers() throws Exception {
|
|
|
|
+ int tickTime = 2000;
|
|
|
|
+ int initLimit = 3;
|
|
|
|
+ int syncLimit = 3;
|
|
|
|
+ HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
|
|
|
|
+ peers.put(Long.valueOf(1), new QuorumServer(1,
|
|
|
|
+ new InetSocketAddress("127.0.0.1", port1 + 1000),
|
|
|
|
+ new InetSocketAddress("127.0.0.1", leport1 + 1000)));
|
|
|
|
+ peers.put(Long.valueOf(2), new QuorumServer(2,
|
|
|
|
+ new InetSocketAddress("127.0.0.1", port2 + 1000),
|
|
|
|
+ new InetSocketAddress("127.0.0.1", leport2 + 1000)));
|
|
|
|
+ peers.put(Long.valueOf(3), new QuorumServer(3,
|
|
|
|
+ new InetSocketAddress("127.0.0.1", port3 + 1000),
|
|
|
|
+ new InetSocketAddress("127.0.0.1", leport3 + 1000)));
|
|
|
|
+ peers.put(Long.valueOf(4), new QuorumServer(4,
|
|
|
|
+ new InetSocketAddress("127.0.0.1", port4 + 1000),
|
|
|
|
+ new InetSocketAddress("127.0.0.1", leport4 + 1000)));
|
|
|
|
+ peers.put(Long.valueOf(5), new QuorumServer(5,
|
|
|
|
+ new InetSocketAddress("127.0.0.1", port5 + 1000),
|
|
|
|
+ new InetSocketAddress("127.0.0.1", leport5 + 1000)));
|
|
|
|
|
|
- protected void tearDown() throws Exception {
|
|
|
|
- for(int i = 0; i < threads.size(); i++) {
|
|
|
|
- LEThread leThread = threads.get(i);
|
|
|
|
- ((FastLeaderElection) leThread.peer.getElectionAlg()).shutdown();
|
|
|
|
- // shutdown() has to be explicitly called for every thread to
|
|
|
|
- // make sure that resources are freed properly and all fixed network ports
|
|
|
|
- // are available for other test cases
|
|
|
|
- leThread.peer.shutdown();
|
|
|
|
|
|
+ LOG.info("creating QuorumPeer 1 port " + port1);
|
|
|
|
+ QuorumHierarchical hq1 = new QuorumHierarchical(qp);
|
|
|
|
+ s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit, hq1);
|
|
|
|
+ assertEquals(port1, s1.getClientPort());
|
|
|
|
+
|
|
|
|
+ LOG.info("creating QuorumPeer 2 port " + port2);
|
|
|
|
+ QuorumHierarchical hq2 = new QuorumHierarchical(qp);
|
|
|
|
+ s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit, hq2);
|
|
|
|
+ assertEquals(port2, s2.getClientPort());
|
|
|
|
+
|
|
|
|
+ LOG.info("creating QuorumPeer 3 port " + port3);
|
|
|
|
+ QuorumHierarchical hq3 = new QuorumHierarchical(qp);
|
|
|
|
+ s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit, hq3);
|
|
|
|
+ assertEquals(port3, s3.getClientPort());
|
|
|
|
+
|
|
|
|
+ LOG.info("creating QuorumPeer 4 port " + port4);
|
|
|
|
+ QuorumHierarchical hq4 = new QuorumHierarchical(qp);
|
|
|
|
+ s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit, hq4);
|
|
|
|
+ assertEquals(port4, s4.getClientPort());
|
|
|
|
+
|
|
|
|
+ LOG.info("creating QuorumPeer 5 port " + port5);
|
|
|
|
+ QuorumHierarchical hq5 = new QuorumHierarchical(qp);
|
|
|
|
+ s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit, hq5);
|
|
|
|
+ assertEquals(port5, s5.getClientPort());
|
|
|
|
+ LOG.info("start QuorumPeer 1");
|
|
|
|
+ s1.start();
|
|
|
|
+ LOG.info("start QuorumPeer 2");
|
|
|
|
+ s2.start();
|
|
|
|
+ LOG.info("start QuorumPeer 3");
|
|
|
|
+ s3.start();
|
|
|
|
+ LOG.info("start QuorumPeer 4");
|
|
|
|
+ s4.start();
|
|
|
|
+ LOG.info("start QuorumPeer 5");
|
|
|
|
+ s5.start();
|
|
|
|
+ LOG.info("started QuorumPeer 5");
|
|
|
|
+
|
|
|
|
+ LOG.info ("Closing ports " + hostPort);
|
|
|
|
+ for (String hp : hostPort.split(",")) {
|
|
|
|
+ assertTrue("waiting for server up",
|
|
|
|
+ ClientBase.waitForServerUp(hp,
|
|
|
|
+ CONNECTION_TIMEOUT));
|
|
|
|
+ LOG.info(hp + " is accepting client connections");
|
|
}
|
|
}
|
|
- LOG.info("FINISHED " + getName());
|
|
|
|
|
|
+
|
|
|
|
+ // interesting to see what's there...
|
|
|
|
+ JMXEnv.dump();
|
|
|
|
+ // make sure we have these 5 servers listed
|
|
|
|
+ Set<String> ensureNames = new LinkedHashSet<String>();
|
|
|
|
+ for (int i = 1; i <= 5; i++) {
|
|
|
|
+ ensureNames.add("InMemoryDataTree");
|
|
|
|
+ }
|
|
|
|
+ for (int i = 1; i <= 5; i++) {
|
|
|
|
+ ensureNames.add("name0=ReplicatedServer_id" + i
|
|
|
|
+ + ",name1=replica." + i + ",name2=");
|
|
|
|
+ }
|
|
|
|
+ for (int i = 1; i <= 5; i++) {
|
|
|
|
+ for (int j = 1; j <= 5; j++) {
|
|
|
|
+ ensureNames.add("name0=ReplicatedServer_id" + i
|
|
|
|
+ + ",name1=replica." + j);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ for (int i = 1; i <= 5; i++) {
|
|
|
|
+ ensureNames.add("name0=ReplicatedServer_id" + i);
|
|
|
|
+ }
|
|
|
|
+ JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
|
|
}
|
|
}
|
|
|
|
|
|
- class LEThread extends Thread {
|
|
|
|
- int i;
|
|
|
|
- QuorumPeer peer;
|
|
|
|
- //int peerRound = 1;
|
|
|
|
|
|
+ @After
|
|
|
|
+ @Override
|
|
|
|
+ protected void tearDown() throws Exception {
|
|
|
|
+ LOG.info("TearDown started");
|
|
|
|
+ ct.tearDownAll();
|
|
|
|
|
|
- LEThread(QuorumPeer peer, int i) {
|
|
|
|
- this.i = i;
|
|
|
|
- this.peer = peer;
|
|
|
|
- LOG.info("Constructor: " + getName());
|
|
|
|
|
|
+ LOG.info("Shutting down server 1");
|
|
|
|
+ shutdown(s1);
|
|
|
|
+ LOG.info("Shutting down server 2");
|
|
|
|
+ shutdown(s2);
|
|
|
|
+ LOG.info("Shutting down server 3");
|
|
|
|
+ shutdown(s3);
|
|
|
|
+ LOG.info("Shutting down server 4");
|
|
|
|
+ shutdown(s4);
|
|
|
|
+ LOG.info("Shutting down server 5");
|
|
|
|
+ shutdown(s5);
|
|
|
|
+
|
|
|
|
+ for (String hp : hostPort.split(",")) {
|
|
|
|
+ assertTrue("waiting for server down",
|
|
|
|
+ ClientBase.waitForServerDown(hp,
|
|
|
|
+ ClientBase.CONNECTION_TIMEOUT));
|
|
|
|
+ LOG.info(hp + " is no longer accepting client connections");
|
|
}
|
|
}
|
|
|
|
|
|
- public void run() {
|
|
|
|
- try {
|
|
|
|
- Vote v = null;
|
|
|
|
- while(true){
|
|
|
|
-
|
|
|
|
- //while(true) {
|
|
|
|
- peer.setPeerState(ServerState.LOOKING);
|
|
|
|
- LOG.info("Going to call leader election.");
|
|
|
|
- v = peer.getElectionAlg().lookForLeader();
|
|
|
|
- if(v == null){
|
|
|
|
- LOG.info("Thread " + i + " got a null vote");
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /*
|
|
|
|
- * A real zookeeper would take care of setting the current vote. Here
|
|
|
|
- * we do it manually.
|
|
|
|
- */
|
|
|
|
- peer.setCurrentVote(v);
|
|
|
|
-
|
|
|
|
- LOG.info("Finished election: " + i + ", " + v.id);
|
|
|
|
- votes[i] = v;
|
|
|
|
-
|
|
|
|
- if((peer.getPeerState() == ServerState.FOLLOWING) ||
|
|
|
|
- (peer.getPeerState() == ServerState.LEADING)) break;
|
|
|
|
- }
|
|
|
|
- LOG.debug("Thread " + i + " votes " + v);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
|
|
+ JMXEnv.tearDown();
|
|
|
|
+
|
|
|
|
+ LOG.info("FINISHED " + getName());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void shutdown(QuorumPeer qp) {
|
|
|
|
+ try {
|
|
|
|
+ ((FastLeaderElection) qp.getElectionAlg()).shutdown();
|
|
|
|
+ LOG.info("Done with leader election");
|
|
|
|
+ qp.shutdown();
|
|
|
|
+ LOG.info("Done with quorum peer");
|
|
|
|
+ qp.join(30000);
|
|
|
|
+ if (qp.isAlive()) {
|
|
|
|
+ fail("QP failed to shutdown in 30 seconds");
|
|
}
|
|
}
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOG.debug("QP interrupted", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
|
- public void testHierarchicalQuorum() throws Exception {
|
|
|
|
- runTest(0);
|
|
|
|
|
|
+ protected ZooKeeper createClient()
|
|
|
|
+ throws IOException, InterruptedException
|
|
|
|
+ {
|
|
|
|
+ return createClient(hostPort);
|
|
}
|
|
}
|
|
-
|
|
|
|
- @Test
|
|
|
|
- public void testHierarchicalQuorumPartial() throws Exception {
|
|
|
|
- runTest(3);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void runTest(int delta) throws Exception {
|
|
|
|
- FastLeaderElection le[] = new FastLeaderElection[count];
|
|
|
|
-
|
|
|
|
- LOG.info("TestHierarchicalQuorum: " + getName()+ ", " + count);
|
|
|
|
- for(int i = 0; i < count; i++) {
|
|
|
|
- int clientport = PortAssignment.unique();
|
|
|
|
- peers.put(Long.valueOf(i),
|
|
|
|
- new QuorumServer(i,
|
|
|
|
- new InetSocketAddress(clientport),
|
|
|
|
- new InetSocketAddress(PortAssignment.unique())));
|
|
|
|
- tmpdir[i] = ClientBase.createTmpDir();
|
|
|
|
- port[i] = clientport;
|
|
|
|
- }
|
|
|
|
|
|
|
|
- for(int i = 0; i < (le.length - delta); i++) {
|
|
|
|
- QuorumHierarchical hq = new QuorumHierarchical(qp);
|
|
|
|
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 2, 2, 2, hq);
|
|
|
|
- peer.startLeaderElection();
|
|
|
|
- LEThread thread = new LEThread(peer, i);
|
|
|
|
- thread.start();
|
|
|
|
- threads.add(thread);
|
|
|
|
- }
|
|
|
|
- LOG.info("Started threads " + getName());
|
|
|
|
|
|
+ protected ZooKeeper createClient(String hp)
|
|
|
|
+ throws IOException, InterruptedException
|
|
|
|
+ {
|
|
|
|
+ CountdownWatcher watcher = new CountdownWatcher();
|
|
|
|
+ return createClient(watcher, hp);
|
|
|
|
+ }
|
|
|
|
|
|
- for(int i = 0; i < threads.size(); i++) {
|
|
|
|
- threads.get(i).join(15000);
|
|
|
|
- if (threads.get(i).isAlive()) {
|
|
|
|
- fail("Threads didn't join");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testHierarchicalQuorum() throws Throwable {
|
|
|
|
+ ct.testHammerBasic();
|
|
}
|
|
}
|
|
}
|
|
}
|