|
@@ -336,6 +336,10 @@ public class Zab1_0Test {
|
|
|
void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
|
|
|
}
|
|
|
|
|
|
+ static public interface ObserverConversation {
|
|
|
+ void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) throws Exception;
|
|
|
+ }
|
|
|
+
|
|
|
public void testLeaderConversation(LeaderConversation conversation) throws Exception {
|
|
|
Socket pair[] = getSocketPair();
|
|
|
Socket leaderSocket = pair[0];
|
|
@@ -500,6 +504,57 @@ public class Zab1_0Test {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testObserverConversation(ObserverConversation conversation) throws Exception {
|
|
|
+ File tmpDir = File.createTempFile("test", "dir");
|
|
|
+ tmpDir.delete();
|
|
|
+ tmpDir.mkdir();
|
|
|
+ Thread observerThread = null;
|
|
|
+ ConversableObserver observer = null;
|
|
|
+ QuorumPeer peer = null;
|
|
|
+ try {
|
|
|
+ peer = createQuorumPeer(tmpDir);
|
|
|
+ peer.setSyncEnabled(true);
|
|
|
+ observer = createObserver(tmpDir, peer);
|
|
|
+ peer.observer = observer;
|
|
|
+
|
|
|
+ ServerSocket ss = new ServerSocket();
|
|
|
+ ss.bind(null);
|
|
|
+ observer.setLeaderSocketAddress((InetSocketAddress)ss.getLocalSocketAddress());
|
|
|
+ final Observer observerForThread = observer;
|
|
|
+
|
|
|
+ observerThread = new Thread() {
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ observerForThread.observeLeader();
|
|
|
+ } catch(Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ observerThread.start();
|
|
|
+ Socket leaderSocket = ss.accept();
|
|
|
+
|
|
|
+ InputArchive ia = BinaryInputArchive.getArchive(leaderSocket
|
|
|
+ .getInputStream());
|
|
|
+ OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket
|
|
|
+ .getOutputStream());
|
|
|
+
|
|
|
+ conversation.converseWithObserver(ia, oa, observer);
|
|
|
+ } finally {
|
|
|
+ if (observer != null) {
|
|
|
+ observer.shutdown();
|
|
|
+ }
|
|
|
+ if (observerThread != null) {
|
|
|
+ observerThread.interrupt();
|
|
|
+ observerThread.join();
|
|
|
+ }
|
|
|
+ if (peer != null) {
|
|
|
+ peer.shutdown();
|
|
|
+ }
|
|
|
+ recursiveDelete(tmpDir);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testUnnecessarySnap() throws Exception {
|
|
|
testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
|
|
@@ -540,6 +595,29 @@ public class Zab1_0Test {
|
|
|
}, 2);
|
|
|
}
|
|
|
|
|
|
+ // We want to track the change with a callback rather than depending on timing
|
|
|
+ class TrackerWatcher implements Watcher {
|
|
|
+ boolean changed;
|
|
|
+ synchronized void waitForChange() throws InterruptedException {
|
|
|
+ while(!changed) {
|
|
|
+ wait();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void process(WatchedEvent event) {
|
|
|
+ if (event.getType() == EventType.NodeDataChanged) {
|
|
|
+ synchronized(this) {
|
|
|
+ changed = true;
|
|
|
+ notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ synchronized public boolean changed() {
|
|
|
+ return changed;
|
|
|
+ }
|
|
|
+
|
|
|
+ };
|
|
|
+
|
|
|
@Test
|
|
|
public void testNormalFollowerRun() throws Exception {
|
|
|
testFollowerConversation(new FollowerConversation() {
|
|
@@ -617,28 +695,6 @@ public class Zab1_0Test {
|
|
|
proposeSetData(qp, proposalZxid, "data2", 2);
|
|
|
oa.writeRecord(qp, null);
|
|
|
|
|
|
- // We want to track the change with a callback rather than depending on timing
|
|
|
- class TrackerWatcher implements Watcher {
|
|
|
- boolean changed;
|
|
|
- synchronized void waitForChange() throws InterruptedException {
|
|
|
- while(!changed) {
|
|
|
- wait();
|
|
|
- }
|
|
|
- }
|
|
|
- @Override
|
|
|
- public void process(WatchedEvent event) {
|
|
|
- if (event.getType() == EventType.NodeDataChanged) {
|
|
|
- synchronized(this) {
|
|
|
- changed = true;
|
|
|
- notifyAll();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- synchronized public boolean changed() {
|
|
|
- return changed;
|
|
|
- }
|
|
|
-
|
|
|
- };
|
|
|
TrackerWatcher watcher = new TrackerWatcher();
|
|
|
|
|
|
// The change should not have happened yet, since we haven't committed
|
|
@@ -915,6 +971,157 @@ public class Zab1_0Test {
|
|
|
assertEquals("BenWasHere", signature);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testNormalObserverRun() throws Exception {
|
|
|
+ testObserverConversation(new ObserverConversation() {
|
|
|
+ @Override
|
|
|
+ public void converseWithObserver(InputArchive ia, OutputArchive oa,
|
|
|
+ Observer o) throws Exception {
|
|
|
+ File tmpDir = File.createTempFile("test", "dir");
|
|
|
+ tmpDir.delete();
|
|
|
+ tmpDir.mkdir();
|
|
|
+ File logDir = o.zk.getTxnLogFactory().getDataDir().getParentFile();
|
|
|
+ File snapDir = o.zk.getTxnLogFactory().getSnapDir().getParentFile();
|
|
|
+ try {
|
|
|
+ Assert.assertEquals(0, o.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(0, o.self.getCurrentEpoch());
|
|
|
+
|
|
|
+ // Setup a database with a single /foo node
|
|
|
+ ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
|
|
|
+ final long foo1Zxid = ZxidUtils.makeZxid(1, 1);
|
|
|
+ final long foo2Zxid = ZxidUtils.makeZxid(1, 2);
|
|
|
+ zkDb.processTxn(new TxnHeader(13, 1313, foo1Zxid, 33,
|
|
|
+ ZooDefs.OpCode.create), new CreateTxn("/foo1",
|
|
|
+ "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
|
|
+ false, 1));
|
|
|
+ zkDb.processTxn(new TxnHeader(13, 1313, foo2Zxid, 33,
|
|
|
+ ZooDefs.OpCode.create), new CreateTxn("/foo2",
|
|
|
+ "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
|
|
+ false, 1));
|
|
|
+ Stat stat = new Stat();
|
|
|
+ Assert.assertEquals("data1",
|
|
|
+ new String(zkDb.getData("/foo1", stat, null)));
|
|
|
+ Assert.assertEquals("data1",
|
|
|
+ new String(zkDb.getData("/foo2", stat, null)));
|
|
|
+
|
|
|
+ QuorumPacket qp = new QuorumPacket();
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.OBSERVERINFO, qp.getType());
|
|
|
+ Assert.assertEquals(qp.getZxid(), 0);
|
|
|
+ LearnerInfo learnInfo = new LearnerInfo();
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(
|
|
|
+ ByteBuffer.wrap(qp.getData()), learnInfo);
|
|
|
+ Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
|
|
|
+ Assert.assertEquals(learnInfo.getServerid(), 0);
|
|
|
+
|
|
|
+ // We are simulating an established leader, so the epoch is 1
|
|
|
+ qp.setType(Leader.LEADERINFO);
|
|
|
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
|
|
|
+ byte protoBytes[] = new byte[4];
|
|
|
+ ByteBuffer.wrap(protoBytes).putInt(0x10000);
|
|
|
+ qp.setData(protoBytes);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
|
|
|
+ Assert.assertEquals(0, qp.getZxid());
|
|
|
+ Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer
|
|
|
+ .wrap(qp.getData()).getInt());
|
|
|
+ Assert.assertEquals(1, o.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(0, o.self.getCurrentEpoch());
|
|
|
+
|
|
|
+ // Send the snapshot we created earlier
|
|
|
+ qp.setType(Leader.SNAP);
|
|
|
+ qp.setData(new byte[0]);
|
|
|
+ qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+ zkDb.serializeSnapshot(oa);
|
|
|
+ oa.writeString("BenWasHere", null);
|
|
|
+ qp.setType(Leader.NEWLEADER);
|
|
|
+ qp.setZxid(ZxidUtils.makeZxid(1, 0));
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ // Get the ack of the new leader
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.ACK, qp.getType());
|
|
|
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
|
|
|
+ Assert.assertEquals(1, o.self.getAcceptedEpoch());
|
|
|
+ Assert.assertEquals(1, o.self.getCurrentEpoch());
|
|
|
+
|
|
|
+ Assert.assertEquals(foo2Zxid, o.zk.getLastProcessedZxid());
|
|
|
+
|
|
|
+ // Make sure the data was recorded in the filesystem ok
|
|
|
+ ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(
|
|
|
+ logDir, snapDir));
|
|
|
+ long lastZxid = zkDb2.loadDataBase();
|
|
|
+ Assert.assertEquals("data1",
|
|
|
+ new String(zkDb2.getData("/foo1", stat, null)));
|
|
|
+ Assert.assertEquals(foo2Zxid, lastZxid);
|
|
|
+
|
|
|
+ // Register watch
|
|
|
+ TrackerWatcher watcher = new TrackerWatcher();
|
|
|
+ Assert.assertEquals("data1", new String(o.zk
|
|
|
+ .getZKDatabase().getData("/foo2", stat, watcher)));
|
|
|
+
|
|
|
+ // Propose /foo1 update
|
|
|
+ long proposalZxid = ZxidUtils.makeZxid(1, 1000);
|
|
|
+ proposeSetData(qp, "/foo1", proposalZxid, "data2", 2);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ // Commit /foo1 update
|
|
|
+ qp.setType(Leader.COMMIT);
|
|
|
+ qp.setZxid(proposalZxid);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ // Inform /foo2 update
|
|
|
+ long informZxid = ZxidUtils.makeZxid(1, 1001);
|
|
|
+ proposeSetData(qp, "/foo2", informZxid, "data2", 2);
|
|
|
+ qp.setType(Leader.INFORM);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ qp.setType(Leader.UPTODATE);
|
|
|
+ qp.setZxid(0);
|
|
|
+ oa.writeRecord(qp, null);
|
|
|
+
|
|
|
+ // Read the uptodate ack
|
|
|
+ readPacketSkippingPing(ia, qp);
|
|
|
+ Assert.assertEquals(Leader.ACK, qp.getType());
|
|
|
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
|
|
|
+
|
|
|
+ // Data should get updated
|
|
|
+ watcher.waitForChange();
|
|
|
+ Assert.assertEquals("data2", new String(o.zk
|
|
|
+ .getZKDatabase().getData("/foo1", stat, null)));
|
|
|
+ Assert.assertEquals("data2", new String(o.zk
|
|
|
+ .getZKDatabase().getData("/foo2", stat, null)));
|
|
|
+
|
|
|
+ zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
|
|
|
+ lastZxid = zkDb2.loadDataBase();
|
|
|
+ Assert.assertEquals("data2", new String(zkDb2.getData("/foo1", stat, null)));
|
|
|
+ Assert.assertEquals("data2", new String(zkDb2.getData("/foo2", stat, null)));
|
|
|
+ Assert.assertEquals(informZxid, lastZxid);
|
|
|
+ } finally {
|
|
|
+ recursiveDelete(tmpDir);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void proposeSetData(QuorumPacket qp, String path,
|
|
|
+ long zxid, String data, int version) throws IOException {
|
|
|
+ qp.setType(Leader.PROPOSAL);
|
|
|
+ qp.setZxid(zxid);
|
|
|
+ TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55,
|
|
|
+ ZooDefs.OpCode.setData);
|
|
|
+ SetDataTxn sdt = new SetDataTxn(path, data.getBytes(), version);
|
|
|
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
|
|
+ OutputArchive boa = BinaryOutputArchive.getArchive(baos);
|
|
|
+ boa.writeRecord(hdr, null);
|
|
|
+ boa.writeRecord(sdt, null);
|
|
|
+ qp.setData(baos.toByteArray());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testLeaderBehind() throws Exception {
|
|
|
testLeaderConversation(new LeaderConversation() {
|
|
@@ -1048,6 +1255,33 @@ public class Zab1_0Test {
|
|
|
return new ConversableFollower(peer, zk);
|
|
|
}
|
|
|
|
|
|
+ static class ConversableObserver extends Observer {
|
|
|
+
|
|
|
+ ConversableObserver(QuorumPeer self, ObserverZooKeeperServer zk) {
|
|
|
+ super(self, zk);
|
|
|
+ }
|
|
|
+
|
|
|
+ InetSocketAddress leaderAddr;
|
|
|
+ public void setLeaderSocketAddress(InetSocketAddress addr) {
|
|
|
+ leaderAddr = addr;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected InetSocketAddress findLeader() {
|
|
|
+ return leaderAddr;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private ConversableObserver createObserver(File tmpDir, QuorumPeer peer)
|
|
|
+ throws IOException {
|
|
|
+ FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
|
|
|
+ peer.setTxnFactory(logFactory);
|
|
|
+ ZKDatabase zkDb = new ZKDatabase(logFactory);
|
|
|
+ ObserverZooKeeperServer zk = new ObserverZooKeeperServer(logFactory, peer, zkDb);
|
|
|
+ peer.setZKDatabase(zkDb);
|
|
|
+ return new ConversableObserver(peer, zk);
|
|
|
+ }
|
|
|
+
|
|
|
private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException {
|
|
|
HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
|
|
|
QuorumPeer peer = new QuorumPeer();
|