|
@@ -23,6 +23,7 @@ import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.LineNumberReader;
|
|
import java.io.LineNumberReader;
|
|
import java.io.StringReader;
|
|
import java.io.StringReader;
|
|
|
|
+import java.io.IOException;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.channels.SocketChannel;
|
|
import java.nio.channels.SocketChannel;
|
|
@@ -43,6 +44,7 @@ import org.apache.zookeeper.ZooKeeper;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
import org.apache.zookeeper.ZooKeeper.States;
|
|
import org.apache.zookeeper.ZooKeeper.States;
|
|
import org.apache.zookeeper.server.quorum.Leader.Proposal;
|
|
import org.apache.zookeeper.server.quorum.Leader.Proposal;
|
|
|
|
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread;
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
@@ -207,6 +209,104 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
mt[i].shutdown();
|
|
mt[i].shutdown();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Test the case of server with highest zxid not present at leader election and joining later.
|
|
|
|
+ * This test case is for reproducing the issue and fixing the bug mentioned in ZOOKEEPER-1154
|
|
|
|
+ * and ZOOKEEPER-1156.
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testHighestZxidJoinLate() throws Exception {
|
|
|
|
+ int numServers = 3;
|
|
|
|
+ Servers svrs = LaunchServers(numServers);
|
|
|
|
+ String path = "/hzxidtest";
|
|
|
|
+ int leader=-1;
|
|
|
|
+
|
|
|
|
+ // find the leader
|
|
|
|
+ for (int i=0; i < numServers; i++) {
|
|
|
|
+ if (svrs.mt[i].main.quorumPeer.leader != null) {
|
|
|
|
+ leader = i;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // make sure there is a leader
|
|
|
|
+ Assert.assertTrue("There should be a leader", leader >=0);
|
|
|
|
+
|
|
|
|
+ int nonleader = (leader+1)%numServers;
|
|
|
|
+
|
|
|
|
+ byte[] input = new byte[1];
|
|
|
|
+ input[0] = 1;
|
|
|
|
+ byte[] output;
|
|
|
|
+
|
|
|
|
+ // Create a couple of nodes
|
|
|
|
+ svrs.zk[leader].create(path+leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
+ svrs.zk[leader].create(path+nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
|
+
|
|
|
|
+ // make sure the updates indeed committed. If it is not
|
|
|
|
+ // the following statement will throw.
|
|
|
|
+ output = svrs.zk[leader].getData(path+nonleader, false, null);
|
|
|
|
+
|
|
|
|
+ // Shutdown every one else but the leader
|
|
|
|
+ for (int i=0; i < numServers; i++) {
|
|
|
|
+ if (i != leader) {
|
|
|
|
+ svrs.mt[i].shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ input[0] = 2;
|
|
|
|
+
|
|
|
|
+ // Update the node on the leader
|
|
|
|
+ svrs.zk[leader].setData(path+leader, input, -1, null, null);
|
|
|
|
+
|
|
|
|
+ // wait some time to let this get written to disk
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+
|
|
|
|
+ // shut the leader down
|
|
|
|
+ svrs.mt[leader].shutdown();
|
|
|
|
+
|
|
|
|
+ System.gc();
|
|
|
|
+
|
|
|
|
+ waitForAll(svrs.zk, States.CONNECTING);
|
|
|
|
+
|
|
|
|
+ // Start everyone but the leader
|
|
|
|
+ for (int i=0; i < numServers; i++) {
|
|
|
|
+ if (i != leader) {
|
|
|
|
+ svrs.mt[i].start();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // wait to connect to one of these
|
|
|
|
+ waitForOne(svrs.zk[nonleader], States.CONNECTED);
|
|
|
|
+
|
|
|
|
+ // validate that the old value is there and not the new one
|
|
|
|
+ output = svrs.zk[nonleader].getData(path+leader, false, null);
|
|
|
|
+
|
|
|
|
+ Assert.assertEquals(
|
|
|
|
+ "Expecting old value 1 since 2 isn't committed yet",
|
|
|
|
+ output[0], 1);
|
|
|
|
+
|
|
|
|
+ // Do some other update, so we bump the maxCommttedZxid
|
|
|
|
+ // by setting the value to 2
|
|
|
|
+ svrs.zk[nonleader].setData(path+nonleader, input, -1);
|
|
|
|
+
|
|
|
|
+ // start the old leader
|
|
|
|
+ svrs.mt[leader].start();
|
|
|
|
+
|
|
|
|
+ // connect to it
|
|
|
|
+ waitForOne(svrs.zk[leader], States.CONNECTED);
|
|
|
|
+
|
|
|
|
+ // make sure it doesn't have the new value that it alone had logged
|
|
|
|
+ output = svrs.zk[leader].getData(path+leader, false, null);
|
|
|
|
+ Assert.assertEquals(
|
|
|
|
+ "Validating that the deposed leader has rolled back that change it had written",
|
|
|
|
+ output[0], 1);
|
|
|
|
+
|
|
|
|
+ // make sure the leader has the subsequent changes that were made while it was offline
|
|
|
|
+ output = svrs.zk[leader].getData(path+nonleader, false, null);
|
|
|
|
+ Assert.assertEquals(
|
|
|
|
+ "Validating that the deposed leader caught up on changes it missed",
|
|
|
|
+ output[0], 2);
|
|
|
|
+ }
|
|
|
|
|
|
private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
|
|
private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
|
|
while(zk.getState() != state) {
|
|
while(zk.getState() != state) {
|
|
@@ -232,6 +332,47 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // This class holds the servers and clients for those servers
|
|
|
|
+ private class Servers {
|
|
|
|
+ MainThread mt[];
|
|
|
|
+ ZooKeeper zk[];
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This is a helper function for launching a set of servers
|
|
|
|
+ *
|
|
|
|
+ * @param numServers
|
|
|
|
+ * @return
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ private Servers LaunchServers(int numServers) throws IOException, InterruptedException {
|
|
|
|
+ int SERVER_COUNT = numServers;
|
|
|
|
+ Servers svrs = new Servers();
|
|
|
|
+ final int clientPorts[] = new int[SERVER_COUNT];
|
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
|
+ for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ clientPorts[i] = PortAssignment.unique();
|
|
|
|
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
|
|
|
|
+ }
|
|
|
|
+ String quorumCfgSection = sb.toString();
|
|
|
|
+
|
|
|
|
+ MainThread mt[] = new MainThread[SERVER_COUNT];
|
|
|
|
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
|
|
|
|
+ for(int i = 0; i < SERVER_COUNT; i++) {
|
|
|
|
+ mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
|
|
|
|
+ mt[i].start();
|
|
|
|
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ waitForAll(zk, States.CONNECTED);
|
|
|
|
+
|
|
|
|
+ svrs.mt = mt;
|
|
|
|
+ svrs.zk = zk;
|
|
|
|
+ return svrs;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Verify handling of bad quorum address
|
|
* Verify handling of bad quorum address
|
|
*/
|
|
*/
|