|
@@ -23,6 +23,7 @@ import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import javax.security.sasl.SaslException;
|
|
|
|
|
|
import org.apache.jute.OutputArchive;
|
|
@@ -42,6 +43,7 @@ import org.apache.zookeeper.server.DataNode;
|
|
|
import org.apache.zookeeper.server.DataTree;
|
|
|
import org.apache.zookeeper.server.ZKDatabase;
|
|
|
import org.apache.zookeeper.server.ZooKeeperServer;
|
|
|
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
|
|
|
|
import org.junit.Assert;
|
|
@@ -60,6 +62,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
|
|
|
MainThread[] mt = null;
|
|
|
ZooKeeper[] zk = null;
|
|
|
+ int[] clientPorts = null;
|
|
|
int leaderId;
|
|
|
int followerA;
|
|
|
|
|
@@ -67,7 +70,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
public void setup() throws Exception {
|
|
|
LOG.info("Start up a 3 server quorum");
|
|
|
final int ENSEMBLE_SERVERS = 3;
|
|
|
- final int clientPorts[] = new int[ENSEMBLE_SERVERS];
|
|
|
+ clientPorts = new int[ENSEMBLE_SERVERS];
|
|
|
StringBuilder sb = new StringBuilder();
|
|
|
String server;
|
|
|
|
|
@@ -259,6 +262,55 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
Assert.assertEquals(stat1, stat2);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testGlobalSessionConsistency() throws Exception {
|
|
|
+ LOG.info("Hook to catch the commitSession event on followerA");
|
|
|
+ CustomizedQPMain followerAMain = (CustomizedQPMain) mt[followerA].main;
|
|
|
+ final ZooKeeperServer zkServer = followerAMain.quorumPeer.getActiveServer();
|
|
|
+
|
|
|
+ // only take snapshot for the next global session we're going to create
|
|
|
+ final AtomicBoolean shouldTakeSnapshot = new AtomicBoolean(true);
|
|
|
+ followerAMain.setCommitSessionListener(new CommitSessionListener() {
|
|
|
+ @Override
|
|
|
+ public void process(long sessionId) {
|
|
|
+ LOG.info("Take snapshot");
|
|
|
+ if (shouldTakeSnapshot.getAndSet(false)) {
|
|
|
+ zkServer.takeSnapshot(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ LOG.info("Create a global session");
|
|
|
+ ZooKeeper globalClient = new ZooKeeper(
|
|
|
+ "127.0.0.1:" + clientPorts[followerA],
|
|
|
+ ClientBase.CONNECTION_TIMEOUT, this);
|
|
|
+ QuorumPeerMainTest.waitForOne(globalClient, States.CONNECTED);
|
|
|
+
|
|
|
+ LOG.info("Restart followerA to load the data from disk");
|
|
|
+ mt[followerA].shutdown();
|
|
|
+ QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
|
|
|
+
|
|
|
+ mt[followerA].start();
|
|
|
+ QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
|
|
|
+
|
|
|
+ LOG.info("Make sure the global sessions are consistent with leader");
|
|
|
+
|
|
|
+ Map<Long, Integer> globalSessionsOnLeader =
|
|
|
+ mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
|
|
|
+ if (mt[followerA].main.quorumPeer == null) {
|
|
|
+ LOG.info("quorumPeer is null");
|
|
|
+ }
|
|
|
+ if (mt[followerA].main.quorumPeer.getZkDb() == null) {
|
|
|
+ LOG.info("zkDb is null");
|
|
|
+ }
|
|
|
+ Map<Long, Integer> globalSessionsOnFollowerA =
|
|
|
+ mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
|
|
|
+ LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(),
|
|
|
+ globalSessionsOnFollowerA.keySet());
|
|
|
+ Assert.assertTrue(globalSessionsOnFollowerA.keySet().containsAll(
|
|
|
+ globalSessionsOnLeader.keySet()));
|
|
|
+ }
|
|
|
+
|
|
|
private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
|
|
|
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
}
|
|
@@ -310,7 +362,17 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
public void nodeSerialized(String path);
|
|
|
}
|
|
|
|
|
|
+ static interface CommitSessionListener {
|
|
|
+ public void process(long sessionId);
|
|
|
+ }
|
|
|
+
|
|
|
static class CustomizedQPMain extends TestQPMain {
|
|
|
+ CommitSessionListener commitSessionListener;
|
|
|
+
|
|
|
+ public void setCommitSessionListener(CommitSessionListener listener) {
|
|
|
+ this.commitSessionListener = listener;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected QuorumPeer getQuorumPeer() throws SaslException {
|
|
|
return new QuorumPeer() {
|
|
@@ -323,6 +385,31 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Follower makeFollower(FileTxnSnapLog logFactory)
|
|
|
+ throws IOException {
|
|
|
+ return new Follower(this, new FollowerZooKeeperServer(
|
|
|
+ logFactory, this, this.getZkDb()) {
|
|
|
+ @Override
|
|
|
+ public void createSessionTracker() {
|
|
|
+ sessionTracker = new LearnerSessionTracker(
|
|
|
+ this, getZKDatabase().getSessionWithTimeOuts(),
|
|
|
+ this.tickTime, self.getId(),
|
|
|
+ self.areLocalSessionsEnabled(),
|
|
|
+ getZooKeeperServerListener()) {
|
|
|
+
|
|
|
+ public synchronized boolean commitSession(
|
|
|
+ long sessionId, int sessionTimeout) {
|
|
|
+ if (commitSessionListener != null) {
|
|
|
+ commitSessionListener.process(sessionId);
|
|
|
+ }
|
|
|
+ return super.commitSession(sessionId, sessionTimeout);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
};
|
|
|
}
|
|
|
}
|