|
@@ -29,9 +29,9 @@ import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
-import org.apache.log4j.Layout;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.apache.log4j.PatternLayout;
|
|
@@ -342,9 +342,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
public void testElectionFraud() throws IOException, InterruptedException {
|
|
|
// capture QuorumPeer logging
|
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
- String loggingPattern = ((PatternLayout) Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
|
|
|
- WriterAppender appender = new WriterAppender(new PatternLayout(loggingPattern), os);
|
|
|
- appender.setThreshold(Level.INFO);
|
|
|
+ WriterAppender appender = getConsoleAppender(os, Level.INFO);
|
|
|
Logger qlogger = Logger.getLogger(QuorumPeer.class);
|
|
|
qlogger.addAppender(appender);
|
|
|
|
|
@@ -540,11 +538,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
ClientBase.setupTestEnv();
|
|
|
|
|
|
// setup the logger to capture all logs
|
|
|
- Layout layout =
|
|
|
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
- WriterAppender appender = new WriterAppender(layout, os);
|
|
|
- appender.setThreshold(Level.WARN);
|
|
|
+ WriterAppender appender = getConsoleAppender(os, Level.WARN);
|
|
|
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
|
|
|
qlogger.addAppender(appender);
|
|
|
|
|
@@ -599,11 +594,8 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
ClientBase.setupTestEnv();
|
|
|
|
|
|
// setup the logger to capture all logs
|
|
|
- Layout layout =
|
|
|
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
- WriterAppender appender = new WriterAppender(layout, os);
|
|
|
- appender.setThreshold(Level.INFO);
|
|
|
+ WriterAppender appender = getConsoleAppender(os, Level.INFO);
|
|
|
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
|
|
|
qlogger.addAppender(appender);
|
|
|
|
|
@@ -742,12 +734,9 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
ClientBase.setupTestEnv();
|
|
|
|
|
|
// setup the logger to capture all logs
|
|
|
- Layout layout =
|
|
|
- Logger.getRootLogger().getAppender("CONSOLE").getLayout();
|
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
- WriterAppender appender = new WriterAppender(layout, os);
|
|
|
+ WriterAppender appender = getConsoleAppender(os, Level.INFO);
|
|
|
appender.setImmediateFlush(true);
|
|
|
- appender.setThreshold(Level.INFO);
|
|
|
Logger zlogger = Logger.getLogger("org.apache.zookeeper");
|
|
|
zlogger.addAppender(appender);
|
|
|
|
|
@@ -1012,4 +1001,119 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
|
Assert.assertNull("server " + i + " should not have /zk" + leader, servers.zk[i].exists("/zk" + leader, false));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Verify that a node without the leader in its view will not attempt to connect to the leader.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testLeaderOutOfView() throws Exception {
|
|
|
+ ClientBase.setupTestEnv();
|
|
|
+
|
|
|
+ int numServers = 3;
|
|
|
+
|
|
|
+ // used for assertions later
|
|
|
+ boolean foundLeading = false;
|
|
|
+ boolean foundFollowing = false;
|
|
|
+
|
|
|
+ // capture QuorumPeer logging
|
|
|
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
+ WriterAppender appender = getConsoleAppender(os, Level.DEBUG);
|
|
|
+ Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
|
|
|
+ qlogger.addAppender(appender);
|
|
|
+
|
|
|
+ try {
|
|
|
+ Servers svrs = new Servers();
|
|
|
+ svrs.clientPorts = new int[numServers];
|
|
|
+ for (int i = 0; i < numServers; i++) {
|
|
|
+ svrs.clientPorts[i] = PortAssignment.unique();
|
|
|
+ }
|
|
|
+
|
|
|
+ String quorumCfgIncomplete = getUniquePortCfgForId(1) + "\n" + getUniquePortCfgForId(2);
|
|
|
+ String quorumCfgComplete = quorumCfgIncomplete + "\n" + getUniquePortCfgForId(3);
|
|
|
+ svrs.mt = new MainThread[3];
|
|
|
+
|
|
|
+ // Node 1 is started without the leader (3) in its config view
|
|
|
+ svrs.mt[0] = new MainThread(1, svrs.clientPorts[0], quorumCfgIncomplete);
|
|
|
+ for (int i = 1; i < numServers; i++) {
|
|
|
+ svrs.mt[i] = new MainThread(i + 1, svrs.clientPorts[i], quorumCfgComplete);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Node 1 must be started first, before quorum is formed, to trigger the attempted invalid connection to 3
|
|
|
+ svrs.mt[0].start();
|
|
|
+ QuorumPeer quorumPeer1 = waitForQuorumPeer(svrs.mt[0], CONNECTION_TIMEOUT);
|
|
|
+ Assert.assertTrue(quorumPeer1.getPeerState() == QuorumPeer.ServerState.LOOKING);
|
|
|
+
|
|
|
+ // Node 3 started second to avoid 1 and 2 forming a quorum before 3 starts up
|
|
|
+ int highestServerIndex = numServers - 1;
|
|
|
+ svrs.mt[highestServerIndex].start();
|
|
|
+ QuorumPeer quorumPeer3 = waitForQuorumPeer(svrs.mt[highestServerIndex], CONNECTION_TIMEOUT);
|
|
|
+ Assert.assertTrue(quorumPeer3.getPeerState() == QuorumPeer.ServerState.LOOKING);
|
|
|
+
|
|
|
+ // Node 2 started last, kicks off leader election
|
|
|
+ for (int i = 1; i < highestServerIndex; i++) {
|
|
|
+ svrs.mt[i].start();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Nodes 2 and 3 now form quorum and fully start. 1 attempts to vote for 3, fails, returns to LOOKING state
|
|
|
+ for (int i = 1; i < numServers; i++) {
|
|
|
+ Assert.assertTrue("waiting for server to start",
|
|
|
+ ClientBase.waitForServerUp("127.0.0.1:" + svrs.clientPorts[i], CONNECTION_TIMEOUT));
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertTrue(svrs.mt[0].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LOOKING);
|
|
|
+ Assert.assertTrue(svrs.mt[highestServerIndex].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LEADING);
|
|
|
+ for (int i = 1; i < highestServerIndex; i++) {
|
|
|
+ Assert.assertTrue(svrs.mt[i].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.FOLLOWING);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Look through the logs for output that indicates Node 1 is LEADING or FOLLOWING
|
|
|
+ LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
|
|
|
+ Pattern leading = Pattern.compile(".*myid=1.*QuorumPeer.*LEADING.*");
|
|
|
+ Pattern following = Pattern.compile(".*myid=1.*QuorumPeer.*FOLLOWING.*");
|
|
|
+
|
|
|
+ String line;
|
|
|
+ while ((line = r.readLine()) != null && !foundLeading && !foundFollowing) {
|
|
|
+ foundLeading = leading.matcher(line).matches();
|
|
|
+ foundFollowing = following.matcher(line).matches();
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ qlogger.removeAppender(appender);
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertFalse("Corrupt peer should never become leader", foundLeading);
|
|
|
+ Assert.assertFalse("Corrupt peer should not attempt connection to out of view leader", foundFollowing);
|
|
|
+ }
|
|
|
+
|
|
|
+ private WriterAppender getConsoleAppender(ByteArrayOutputStream os, Level level) {
|
|
|
+ String loggingPattern = ((PatternLayout) Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
|
|
|
+ WriterAppender appender = new WriterAppender(new PatternLayout(loggingPattern), os);
|
|
|
+ appender.setThreshold(level);
|
|
|
+ return appender;
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getUniquePortCfgForId(int id) {
|
|
|
+ return String.format("server.%d=127.0.0.1:%d:%d", id, PortAssignment.unique(), PortAssignment.unique());
|
|
|
+ }
|
|
|
+
|
|
|
+ private QuorumPeer waitForQuorumPeer(MainThread mainThread, int timeout) throws TimeoutException {
|
|
|
+ long start = Time.currentElapsedTime();
|
|
|
+ while (true) {
|
|
|
+ QuorumPeer quorumPeer = mainThread.isAlive() ? mainThread.getQuorumPeer() : null;
|
|
|
+ if (quorumPeer != null) {
|
|
|
+ return quorumPeer;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (Time.currentElapsedTime() > start + timeout) {
|
|
|
+ LOG.error("Timed out while waiting for QuorumPeer");
|
|
|
+ throw new TimeoutException();
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ Thread.sleep(250);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|