|
@@ -30,6 +30,7 @@ import java.util.Set;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
import org.apache.zookeeper.TestableZooKeeper;
|
|
|
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
@@ -47,6 +48,12 @@ public class QuorumBase extends ClientBase {
|
|
|
private int port3;
|
|
|
private int port4;
|
|
|
private int port5;
|
|
|
+
|
|
|
+ private int portLE1;
|
|
|
+ private int portLE2;
|
|
|
+ private int portLE3;
|
|
|
+ private int portLE4;
|
|
|
+ private int portLE5;
|
|
|
|
|
|
@Override
|
|
|
protected void setUp() throws Exception {
|
|
@@ -66,11 +73,18 @@ public class QuorumBase extends ClientBase {
|
|
|
port3 = PortAssignment.unique();
|
|
|
port4 = PortAssignment.unique();
|
|
|
port5 = PortAssignment.unique();
|
|
|
- hostPort = "127.0.0.1:" + port1
|
|
|
- + ",127.0.0.1:" + port2
|
|
|
- + ",127.0.0.1:" + port3
|
|
|
- + ",127.0.0.1:" + port4
|
|
|
- + ",127.0.0.1:" + port5;
|
|
|
+
|
|
|
+ portLE1 = PortAssignment.unique();
|
|
|
+ portLE2 = PortAssignment.unique();
|
|
|
+ portLE3 = PortAssignment.unique();
|
|
|
+ portLE4 = PortAssignment.unique();
|
|
|
+ portLE5 = PortAssignment.unique();
|
|
|
+
|
|
|
+ hostPort = "127.0.0.1:" + port1 + ":" + portLE1
|
|
|
+ + ",127.0.0.1:" + port2 + ":" + portLE2
|
|
|
+ + ",127.0.0.1:" + port3 + ":" + portLE3
|
|
|
+ + ",127.0.0.1:" + port4 + ":" + portLE4
|
|
|
+ + ",127.0.0.1:" + port5 + ":" + portLE5;
|
|
|
LOG.info("Ports are: " + hostPort);
|
|
|
|
|
|
s1dir = ClientBase.createTmpDir();
|
|
@@ -102,11 +116,26 @@ public class QuorumBase extends ClientBase {
|
|
|
int initLimit = 3;
|
|
|
int syncLimit = 3;
|
|
|
HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
|
|
|
- peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1 + 1000)));
|
|
|
- peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", port2 + 1000)));
|
|
|
- peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3 + 1000)));
|
|
|
- peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", port4 + 1000)));
|
|
|
- peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", port5 + 1000)));
|
|
|
+ peers.put(Long.valueOf(1), new QuorumServer(1,
|
|
|
+ new InetSocketAddress("127.0.0.1", port1 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE1 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(2), new QuorumServer(2,
|
|
|
+ new InetSocketAddress("127.0.0.1", port2 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE2 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(3), new QuorumServer(3,
|
|
|
+ new InetSocketAddress("127.0.0.1", port3 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE3 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(4), new QuorumServer(4,
|
|
|
+ new InetSocketAddress("127.0.0.1", port4 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE4 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(5), new QuorumServer(5,
|
|
|
+ new InetSocketAddress("127.0.0.1", port5 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE5 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
|
|
|
if (withObservers) {
|
|
|
peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER;
|
|
@@ -114,19 +143,19 @@ public class QuorumBase extends ClientBase {
|
|
|
}
|
|
|
|
|
|
LOG.info("creating QuorumPeer 1 port " + port1);
|
|
|
- s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
|
|
|
+ s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port1, s1.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 2 port " + port2);
|
|
|
- s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 0, 2, tickTime, initLimit, syncLimit);
|
|
|
+ s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port2, s2.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 3 port " + port3);
|
|
|
- s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 0, 3, tickTime, initLimit, syncLimit);
|
|
|
+ s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port3, s3.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 4 port " + port4);
|
|
|
- s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 0, 4, tickTime, initLimit, syncLimit);
|
|
|
+ s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port4, s4.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 5 port " + port5);
|
|
|
- s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
|
|
|
+ s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port5, s5.getClientPort());
|
|
|
|
|
|
if (withObservers) {
|
|
@@ -181,26 +210,42 @@ public class QuorumBase extends ClientBase {
|
|
|
int initLimit = 3;
|
|
|
int syncLimit = 3;
|
|
|
HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
|
|
|
- peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1 + 1000)));
|
|
|
- peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1", port2 + 1000)));
|
|
|
- peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1", port3 + 1000)));
|
|
|
- peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1", port4 + 1000)));
|
|
|
- peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1", port5 + 1000)));
|
|
|
|
|
|
+ peers.put(Long.valueOf(1), new QuorumServer(1,
|
|
|
+ new InetSocketAddress("127.0.0.1", port1 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE1 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(2), new QuorumServer(2,
|
|
|
+ new InetSocketAddress("127.0.0.1", port2 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE2 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(3), new QuorumServer(3,
|
|
|
+ new InetSocketAddress("127.0.0.1", port3 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE3 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(4), new QuorumServer(4,
|
|
|
+ new InetSocketAddress("127.0.0.1", port4 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE4 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+ peers.put(Long.valueOf(5), new QuorumServer(5,
|
|
|
+ new InetSocketAddress("127.0.0.1", port5 + 1000),
|
|
|
+ new InetSocketAddress("127.0.0.1", portLE5 + 1000),
|
|
|
+ LearnerType.PARTICIPANT));
|
|
|
+
|
|
|
LOG.info("creating QuorumPeer 1 port " + port1);
|
|
|
- s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
|
|
|
+ s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 3, 1, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port1, s1.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 2 port " + port2);
|
|
|
- s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 0, 2, tickTime, initLimit, syncLimit);
|
|
|
+ s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 3, 2, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port2, s2.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 3 port " + port3);
|
|
|
- s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 0, 3, tickTime, initLimit, syncLimit);
|
|
|
+ s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 3, 3, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port3, s3.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 4 port " + port4);
|
|
|
- s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 0, 4, tickTime, initLimit, syncLimit);
|
|
|
+ s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 3, 4, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port4, s4.getClientPort());
|
|
|
LOG.info("creating QuorumPeer 5 port " + port5);
|
|
|
- s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
|
|
|
+ s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 3, 5, tickTime, initLimit, syncLimit);
|
|
|
assertEquals(port5, s5.getClientPort());
|
|
|
}
|
|
|
|
|
@@ -242,6 +287,7 @@ public class QuorumBase extends ClientBase {
|
|
|
protected void shutdown(QuorumPeer qp) {
|
|
|
try {
|
|
|
qp.shutdown();
|
|
|
+ ((FastLeaderElection) qp.getElectionAlg()).shutdown();
|
|
|
qp.join(30000);
|
|
|
if (qp.isAlive()) {
|
|
|
fail("QP failed to shutdown in 30 seconds");
|