|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
+import java.io.ByteArrayOutputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
@@ -33,19 +34,24 @@ import org.apache.jute.BinaryInputArchive;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
|
import org.apache.jute.InputArchive;
|
|
|
import org.apache.jute.OutputArchive;
|
|
|
+import org.apache.zookeeper.WatchedEvent;
|
|
|
+import org.apache.zookeeper.Watcher;
|
|
|
+import org.apache.zookeeper.ZooDefs;
|
|
|
+import org.apache.zookeeper.Watcher.Event.EventType;
|
|
|
+import org.apache.zookeeper.data.Stat;
|
|
|
+import org.apache.zookeeper.server.ByteBufferInputStream;
|
|
|
import org.apache.zookeeper.server.ByteBufferOutputStream;
|
|
|
-import org.apache.zookeeper.server.DataTree;
|
|
|
import org.apache.zookeeper.server.ServerCnxn;
|
|
|
import org.apache.zookeeper.server.ServerCnxnFactory;
|
|
|
import org.apache.zookeeper.server.ZKDatabase;
|
|
|
import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
|
|
-import org.apache.zookeeper.server.quorum.Leader;
|
|
|
-import org.apache.zookeeper.server.quorum.LearnerInfo;
|
|
|
-import org.apache.zookeeper.server.quorum.QuorumPacket;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
|
|
|
import org.apache.zookeeper.server.util.ZxidUtils;
|
|
|
+import org.apache.zookeeper.txn.CreateTxn;
|
|
|
+import org.apache.zookeeper.txn.SetDataTxn;
|
|
|
+import org.apache.zookeeper.txn.TxnHeader;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -226,10 +232,10 @@ public class Zab1_0Test {
|
|
|
}
|
|
|
|
|
|
static public interface FollowerConversation {
|
|
|
- void converseWithFollower(InputArchive ia, OutputArchive oa) throws Exception;
|
|
|
+ void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
|
|
|
}
|
|
|
|
|
|
- public void testConversation(LeaderConversation conversation) throws Exception {
|
|
|
+ public void testLeaderConversation(LeaderConversation conversation) throws Exception {
|
|
|
Socket pair[] = getSocketPair();
|
|
|
Socket leaderSocket = pair[0];
|
|
|
Socket followerSocket = pair[1];
|
|
@@ -270,12 +276,215 @@ public class Zab1_0Test {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ public void testFollowerConversation(FollowerConversation conversation) throws Exception {
|
|
|
+ File tmpDir = File.createTempFile("test", "dir");
|
|
|
+ tmpDir.delete();
|
|
|
+ tmpDir.mkdir();
|
|
|
+ Thread followerThread = null;
|
|
|
+ ConversableFollower follower = null;
|
|
|
+ QuorumPeer peer = null;
|
|
|
+ try {
|
|
|
+ peer = createQuorumPeer(tmpDir);
|
|
|
+ follower = createFollower(tmpDir, peer);
|
|
|
+ peer.follower = follower;
|
|
|
+
|
|
|
+ ServerSocket ss = new ServerSocket();
|
|
|
+ ss.bind(null);
|
|
|
+ follower.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
|
|
|
+ final Follower followerForThread = follower;
|
|
|
+
|
|
|
+ followerThread = new Thread() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ followerForThread.followLeader();
|
|
|
+ } catch(Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ followerThread.start();
|
|
|
+ Socket leaderSocket = ss.accept();
|
|
|
+
|
|
|
+ InputArchive ia = BinaryInputArchive.getArchive(leaderSocket
|
|
|
+ .getInputStream());
|
|
|
+ OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket
|
|
|
+ .getOutputStream());
|
|
|
+
|
|
|
+ conversation.converseWithFollower(ia, oa, follower);
|
|
|
+ } finally {
|
|
|
+ if (follower != null) {
|
|
|
+ follower.shutdown();
|
|
|
+ }
|
|
|
+ if (followerThread != null) {
|
|
|
+ followerThread.interrupt();
|
|
|
+ followerThread.join();
|
|
|
+ }
|
|
|
+ if (peer != null) {
|
|
|
+ peer.shutdown();
|
|
|
+ }
|
|
|
+ recursiveDelete(tmpDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNormalFollowerRun() throws Exception {
|
|
|
+ testFollowerConversation(new FollowerConversation() {
|
|
|
+ @Override
|
|
|
+ public void converseWithFollower(InputArchive ia, OutputArchive oa,
|
|
|
+ Follower f) throws Exception {
|
|
|
+ File tmpDir = File.createTempFile("test", "dir");
|
|
|
+ tmpDir.delete();
|
|
|
+ tmpDir.mkdir();
|
|
|
+ File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
|
|
|
+ File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
|
|
|
+ try {
|
|
|
+ Assert.assertEquals(0, f.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(0, f.self.getCurrentEpoch());
|
|
|
+
|
|
|
+ // Setup a database with a single /foo node
|
|
|
+ ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
|
|
|
+ final long firstZxid = ZxidUtils.makeZxid(1, 1);
|
|
|
+ zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
|
|
|
+ Stat stat = new Stat();
|
|
|
+ Assert.assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
|
|
|
+
|
|
|
+ QuorumPacket qp = new QuorumPacket();
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.FOLLOWERINFO, qp.getType());
|
|
|
+ Assert.assertEquals(qp.getZxid(), 0);
|
|
|
+ LearnerInfo learnInfo = new LearnerInfo();
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
|
|
|
+ Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
|
|
|
+ Assert.assertEquals(learnInfo.getServerid(), 0);
|
|
|
+
|
|
|
+ // We are simulating an established leader, so the epoch is 1
|
|
|
+ qp.setType(Leader.LEADERINFO);
|
|
|
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
|
|
|
+ byte protoBytes[] = new byte[4];
|
|
|
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
|
|
|
+ qp.setData(protoBytes);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
|
|
|
+ Assert.assertEquals(0, qp.getZxid());
|
|
|
+ Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
|
|
|
+ Assert.assertEquals(1, f.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(0, f.self.getCurrentEpoch());
|
|
|
+
|
|
|
+ // Send the snapshot we created earlier
|
|
|
+ qp.setType(Leader.SNAP);
|
|
|
+ qp.setData(new byte[0]);
|
|
|
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+ zkDb.serializeSnapshot(oa);
|
|
|
+ oa.writeString("BenWasHere", null);
|
|
|
+ qp.setType(Leader.NEWLEADER);
|
|
|
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ // Get the ack of the new leader
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.ACK, qp.getType());
|
|
|
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
|
|
|
+ Assert.assertEquals(1, f.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(1, f.self.getCurrentEpoch());
|
|
|
+
|
|
|
+ Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
|
|
|
+
|
|
|
+ // Make sure the data was recorded in the filesystem ok
|
|
|
+ ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
|
|
|
+ long lastZxid = zkDb2.loadDataBase();
|
|
|
+ Assert.assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
|
|
|
+ Assert.assertEquals(firstZxid, lastZxid);
|
|
|
+
|
|
|
+ // Propose an update
|
|
|
+ long proposalZxid = ZxidUtils.makeZxid(1, 1000);
|
|
|
+ proposeSetData(qp, proposalZxid, "data2", 2);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ // We want to track the change with a callback rather than depending on timing
|
|
|
+ class TrackerWatcher implements Watcher {
|
|
|
+ boolean changed;
|
|
|
+ synchronized void waitForChange() throws InterruptedException {
|
|
|
+ while(!changed) {
|
|
|
+ wait();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
+ if (event.getType() == EventType.NodeDataChanged) {
|
|
|
+ synchronized(this) {
|
|
|
+ changed = true;
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ synchronized public boolean changed() {
|
|
|
+ return changed;
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+ TrackerWatcher watcher = new TrackerWatcher();
|
|
|
+
|
|
|
+ // The change should not have happened yet, since we haven't committed
|
|
|
+ Assert.assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
|
|
|
+
|
|
|
+ // The change should happen now
|
|
|
+ qp.setType(Leader.COMMIT);
|
|
|
+ qp.setZxid(proposalZxid);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ qp.setType(Leader.UPTODATE);
|
|
|
+ qp.setZxid(0);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ // Read the uptodate ack
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.ACK, qp.getType());
|
|
|
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
|
|
|
+
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.ACK, qp.getType());
|
|
|
+ Assert.assertEquals(proposalZxid, qp.getZxid());
|
|
|
+
|
|
|
+ watcher.waitForChange();
|
|
|
+ Assert.assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
|
|
|
+
|
|
|
+ // check and make sure the change is persisted
|
|
|
+ zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
|
|
|
+ lastZxid = zkDb2.loadDataBase();
|
|
|
+ Assert.assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
|
|
|
+ Assert.assertEquals(proposalZxid, lastZxid);
|
|
|
+ } finally {
|
|
|
+ recursiveDelete(tmpDir);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
|
|
|
+ qp.setType(Leader.PROPOSAL);
|
|
|
+ qp.setZxid(zxid);
|
|
|
+ TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
|
|
|
+ SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
+ OutputArchive boa = BinaryOutputArchive.getArchive(baos);
|
|
|
+ boa.writeRecord(hdr, null);
|
|
|
+ boa.writeRecord(sdt, null);
|
|
|
+ qp.setData(baos.toByteArray());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testNormalRun() throws Exception {
|
|
|
- testConversation(new LeaderConversation() {
|
|
|
+ testLeaderConversation(new LeaderConversation() {
|
|
|
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
|
|
|
throws IOException {
|
|
|
+ Assert.assertEquals(0, l.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(0, l.self.getCurrentEpoch());
|
|
|
+
|
|
|
/* we test a normal run. everything should work out well. */
|
|
|
LearnerInfo li = new LearnerInfo(1, 0x10000);
|
|
|
byte liBytes[] = new byte[12];
|
|
@@ -284,20 +493,30 @@ public class Zab1_0Test {
|
|
|
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
|
|
|
liBytes, null);
|
|
|
oa.writeRecord(qp, null);
|
|
|
+
|
|
|
readPacketSkippingPing(ia, qp);
|
|
|
Assert.assertEquals(Leader.LEADERINFO, qp.getType());
|
|
|
Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
|
|
|
Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
|
|
|
0x10000);
|
|
|
+ Assert.assertEquals(1, l.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(0, l.self.getCurrentEpoch());
|
|
|
+
|
|
|
qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
|
|
|
oa.writeRecord(qp, null);
|
|
|
+
|
|
|
readPacketSkippingPing(ia, qp);
|
|
|
Assert.assertEquals(Leader.DIFF, qp.getType());
|
|
|
+
|
|
|
readPacketSkippingPing(ia, qp);
|
|
|
Assert.assertEquals(Leader.NEWLEADER, qp.getType());
|
|
|
Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
|
|
|
+ Assert.assertEquals(1, l.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(1, l.self.getCurrentEpoch());
|
|
|
+
|
|
|
qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
|
|
|
oa.writeRecord(qp, null);
|
|
|
+
|
|
|
readPacketSkippingPing(ia, qp);
|
|
|
Assert.assertEquals(Leader.UPTODATE, qp.getType());
|
|
|
}
|
|
@@ -306,7 +525,7 @@ public class Zab1_0Test {
|
|
|
|
|
|
@Test
|
|
|
public void testLeaderBehind() throws Exception {
|
|
|
- testConversation(new LeaderConversation() {
|
|
|
+ testLeaderConversation(new LeaderConversation() {
|
|
|
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
|
|
|
throws IOException {
|
|
|
/* we test a normal run. everything should work out well. */
|
|
@@ -346,7 +565,7 @@ public class Zab1_0Test {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testAbandonBeforeACKEpoch() throws Exception {
|
|
|
- testConversation(new LeaderConversation() {
|
|
|
+ testLeaderConversation(new LeaderConversation() {
|
|
|
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
|
|
|
throws IOException, InterruptedException {
|
|
|
/* we test a normal run. everything should work out well. */
|
|
@@ -392,6 +611,33 @@ public class Zab1_0Test {
|
|
|
LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
|
|
|
return new Leader(peer, zk);
|
|
|
}
|
|
|
+
|
|
|
+ static class ConversableFollower extends Follower {
|
|
|
+
|
|
|
+ ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
|
|
|
+ super(self, zk);
|
|
|
+ }
|
|
|
+
|
|
|
+ InetSocketAddress leaderAddr;
|
|
|
+ public void setLeaderSocketAddress(InetSocketAddress addr) {
|
|
|
+ leaderAddr = addr;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected InetSocketAddress findLeader() {
|
|
|
+ return leaderAddr;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private ConversableFollower createFollower(File tmpDir, QuorumPeer peer)
|
|
|
+ throws IOException {
|
|
|
+ FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
|
|
|
+ peer.setTxnFactory(logFactory);
|
|
|
+ ZKDatabase zkDb = new ZKDatabase(logFactory);
|
|
|
+ FollowerZooKeeperServer zk = new FollowerZooKeeperServer(logFactory, peer, zkDb);
|
|
|
+ peer.setZKDatabase(zkDb);
|
|
|
+ return new ConversableFollower(peer, zk);
|
|
|
+ }
|
|
|
+
|
|
|
private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
|
|
|
FileNotFoundException {
|
|
|
QuorumPeer peer = new QuorumPeer();
|