|
@@ -27,6 +27,7 @@ import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
@@ -335,6 +336,100 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
output[0], 2);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This test validates that if a quorum member determines that it is leader without the support of the rest of the
|
|
|
+ * quorum (the other members do not believe it to be the leader) it will stop attempting to lead and become a follower.
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testElectionFraud() throws IOException, InterruptedException {
|
|
|
+ // capture QuorumPeer logging
|
|
|
+ Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
|
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
+ WriterAppender appender = new WriterAppender(layout, os);
|
|
|
+ appender.setThreshold(Level.INFO);
|
|
|
+ Logger qlogger = Logger.getLogger(QuorumPeer.class);
|
|
|
+ qlogger.addAppender(appender);
|
|
|
+
|
|
|
+ numServers = 3;
|
|
|
+
|
|
|
+ // used for assertions later
|
|
|
+ boolean foundLeading = false;
|
|
|
+ boolean foundLooking = false;
|
|
|
+ boolean foundFollowing = false;
|
|
|
+
|
|
|
+ try {
|
|
|
+ // spin up a quorum, we use a small ticktime to make the test run faster
|
|
|
+ servers = LaunchServers(numServers, 500);
|
|
|
+
|
|
|
+ // find the leader
|
|
|
+ int trueLeader = -1;
|
|
|
+ for (int i = 0; i < numServers; i++) {
|
|
|
+ if (servers.mt[i].main.quorumPeer.leader != null) {
|
|
|
+ trueLeader = i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertTrue("There should be a leader", trueLeader >= 0);
|
|
|
+
|
|
|
+ // find a follower
|
|
|
+ int falseLeader = (trueLeader + 1) % numServers;
|
|
|
+ Assert.assertTrue("All servers should join the quorum", servers.mt[falseLeader].main.quorumPeer.follower != null);
|
|
|
+
|
|
|
+ // to keep the quorum peer running and force it to go into the looking state, we kill leader election
|
|
|
+ // and close the connection to the leader
|
|
|
+ servers.mt[falseLeader].main.quorumPeer.electionAlg.shutdown();
|
|
|
+ servers.mt[falseLeader].main.quorumPeer.follower.getSocket().close();
|
|
|
+
|
|
|
+ // wait for the falseLeader to disconnect
|
|
|
+ waitForOne(servers.zk[falseLeader], States.CONNECTING);
|
|
|
+
|
|
|
+ // convince falseLeader that it is the leader
|
|
|
+ servers.mt[falseLeader].main.quorumPeer.setPeerState(QuorumPeer.ServerState.LEADING);
|
|
|
+
|
|
|
+ // provide time for the falseleader to realize no followers have connected
|
|
|
+ // (this is twice the timeout used in Leader#getEpochToPropose)
|
|
|
+ Thread.sleep(2 * servers.mt[falseLeader].main.quorumPeer.initLimit * servers.mt[falseLeader].main.quorumPeer.tickTime);
|
|
|
+
|
|
|
+ // Restart leader election
|
|
|
+ servers.mt[falseLeader].main.quorumPeer.startLeaderElection();
|
|
|
+
|
|
|
+ // The previous client connection to falseLeader likely closed, create a new one
|
|
|
+ servers.zk[falseLeader] = new ZooKeeper("127.0.0.1:" + servers.mt[falseLeader].getClientPort(), ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+
|
|
|
+ // Wait for falseLeader to rejoin the quorum
|
|
|
+ waitForOne(servers.zk[falseLeader], States.CONNECTED);
|
|
|
+
|
|
|
+ // and ensure trueLeader is still the leader
|
|
|
+ Assert.assertTrue(servers.mt[trueLeader].main.quorumPeer.leader != null);
|
|
|
+
|
|
|
+ // Look through the logs for output that indicates the falseLeader is LEADING, then LOOKING, then FOLLOWING
|
|
|
+ LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
|
|
|
+ Pattern leading = Pattern.compile(".*myid=" + falseLeader + ".*LEADING.*");
|
|
|
+ Pattern looking = Pattern.compile(".*myid=" + falseLeader + ".*LOOKING.*");
|
|
|
+ Pattern following = Pattern.compile(".*myid=" + falseLeader + ".*FOLLOWING.*");
|
|
|
+
|
|
|
+ String line;
|
|
|
+ while ((line = r.readLine()) != null) {
|
|
|
+ if (!foundLeading) {
|
|
|
+ foundLeading = leading.matcher(line).matches();
|
|
|
+ } else if(!foundLooking) {
|
|
|
+ foundLooking = looking.matcher(line).matches();
|
|
|
+ } else if (following.matcher(line).matches()){
|
|
|
+ foundFollowing = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ qlogger.removeAppender(appender);
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertTrue("falseLeader never attempts to become leader", foundLeading);
|
|
|
+ Assert.assertTrue("falseLeader never gives up on leadership", foundLooking);
|
|
|
+ Assert.assertTrue("falseLeader never rejoins the quorum", foundFollowing);
|
|
|
+ }
|
|
|
+
|
|
|
private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
|
|
|
int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
|
|
|
while (zk.getState() != state) {
|
|
@@ -371,40 +466,48 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
ZooKeeper zk[];
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This is a helper function for launching a set of servers
|
|
|
- *
|
|
|
- * @param numServers
|
|
|
- * @return
|
|
|
- * @throws IOException
|
|
|
- * @throws InterruptedException
|
|
|
- */
|
|
|
- private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
|
|
|
- int SERVER_COUNT = numServers;
|
|
|
- Servers svrs = new Servers();
|
|
|
- final int clientPorts[] = new int[SERVER_COUNT];
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- clientPorts[i] = PortAssignment.unique();
|
|
|
- sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
|
|
|
- }
|
|
|
- String quorumCfgSection = sb.toString();
|
|
|
|
|
|
- MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
- ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
- for (int i = 0; i < SERVER_COUNT; i++) {
|
|
|
- mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
- mt[i].start();
|
|
|
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
- }
|
|
|
-
|
|
|
- waitForAll(zk, States.CONNECTED);
|
|
|
-
|
|
|
- svrs.mt = mt;
|
|
|
- svrs.zk = zk;
|
|
|
- return svrs;
|
|
|
+ private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
|
|
|
+ return LaunchServers(numServers, null);
|
|
|
}
|
|
|
|
|
|
+ /** * This is a helper function for launching a set of servers
|
|
|
+ *
|
|
|
+ * @param numServers* @param tickTime A ticktime to pass to MainThread
|
|
|
+ * @return
|
|
|
+ * @throws IOException
|
|
|
+ * @throws InterruptedException
|
|
|
+ */
|
|
|
+ private Servers LaunchServers(int numServers, Integer tickTime) throws IOException, InterruptedException {
|
|
|
+ int SERVER_COUNT = numServers;
|
|
|
+ Servers svrs = new Servers();
|
|
|
+ final int clientPorts[] = new int[SERVER_COUNT];
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ clientPorts[i] = PortAssignment.unique();
|
|
|
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
|
|
|
+ }
|
|
|
+ String quorumCfgSection = sb.toString();
|
|
|
+
|
|
|
+ MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
+ for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
+ if (tickTime != null) {
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection, new HashMap<String, String>(), tickTime);
|
|
|
+ } else {
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
+ }
|
|
|
+ mt[i].start();
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ }
|
|
|
+
|
|
|
+ waitForAll(zk, States.CONNECTED);
|
|
|
+
|
|
|
+ svrs.mt = mt;
|
|
|
+ svrs.zk = zk;
|
|
|
+ return svrs;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Verify handling of bad quorum address
|
|
|
*/
|