|
@@ -18,22 +18,27 @@
|
|
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import javax.security.sasl.SaslException;
|
|
|
|
|
|
+import org.apache.jute.OutputArchive;
|
|
|
+
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException.NoNodeException;
|
|
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
|
|
import org.apache.zookeeper.Op;
|
|
|
+
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
import org.apache.zookeeper.ZooKeeper;
|
|
|
import org.apache.zookeeper.ZooKeeper.States;
|
|
|
import org.apache.zookeeper.data.ACL;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
+import org.apache.zookeeper.server.DataNode;
|
|
|
import org.apache.zookeeper.server.DataTree;
|
|
|
import org.apache.zookeeper.server.ZKDatabase;
|
|
|
import org.apache.zookeeper.server.ZooKeeperServer;
|
|
@@ -162,6 +167,98 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
new String(zk[followerA].getData(node2, null, null)));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * It's possibel during SNAP sync, the parent is serialized before the
|
|
|
+ * child get deleted during sending the snapshot over.
|
|
|
+ *
|
|
|
+ * In which case, we need to make sure the pzxid get correctly updated
|
|
|
+ * when applying the txns received.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testPZxidUpdatedDuringSnapSyncing() throws Exception {
|
|
|
+ LOG.info("Enable force snapshot sync");
|
|
|
+ System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
|
|
|
+
|
|
|
+ final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode";
|
|
|
+ final String child = parent + "/child";
|
|
|
+ createEmptyNode(zk[leaderId], parent);
|
|
|
+ createEmptyNode(zk[leaderId], child);
|
|
|
+
|
|
|
+ LOG.info("shutdown follower {}", followerA);
|
|
|
+ mt[followerA].shutdown();
|
|
|
+ QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
|
|
|
+
|
|
|
+ LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
|
|
|
+ addSerializeListener(leaderId, parent, child);
|
|
|
+
|
|
|
+ LOG.info("Restart follower A to trigger a SNAP sync with leader");
|
|
|
+ mt[followerA].start();
|
|
|
+ QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
|
|
|
+
|
|
|
+ LOG.info("Check and make sure the pzxid of the parent is the same " +
|
|
|
+ "on leader and follower A");
|
|
|
+ compareStat(parent, leaderId, followerA);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * It's possible during taking fuzzy snapshot, the parent is serialized
|
|
|
+ * before the child get deleted in the fuzzy range.
|
|
|
+ *
|
|
|
+ * In which case, we need to make sure the pzxid get correctly updated
|
|
|
+ * when replaying the txns.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testPZxidUpdatedWhenLoadingSnapshot() throws Exception {
|
|
|
+
|
|
|
+ final String parent = "/testPZxidUpdatedDuringTakingSnapshot";
|
|
|
+ final String child = parent + "/child";
|
|
|
+ createEmptyNode(zk[followerA], parent);
|
|
|
+ createEmptyNode(zk[followerA], child);
|
|
|
+
|
|
|
+ LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
|
|
|
+ addSerializeListener(followerA, parent, child);
|
|
|
+
|
|
|
+ LOG.info("Take snapshot on follower A");
|
|
|
+ ZooKeeperServer zkServer = mt[followerA].main.quorumPeer.getActiveServer();
|
|
|
+ zkServer.takeSnapshot(true);
|
|
|
+
|
|
|
+ LOG.info("Restarting follower A to load snapshot");
|
|
|
+ mt[followerA].shutdown();
|
|
|
+ mt[followerA].start();
|
|
|
+ QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
|
|
|
+
|
|
|
+ LOG.info("Check and make sure the pzxid of the parent is the same " +
|
|
|
+ "on leader and follower A");
|
|
|
+ compareStat(parent, leaderId, followerA);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addSerializeListener(int sid, String parent, String child) {
|
|
|
+ final ZooKeeper zkClient = zk[followerA];
|
|
|
+ CustomDataTree dt =
|
|
|
+ (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree();
|
|
|
+ dt.addListener(parent, new NodeSerializeListener() {
|
|
|
+ @Override
|
|
|
+ public void nodeSerialized(String path) {
|
|
|
+ try {
|
|
|
+ zkClient.delete(child, -1);
|
|
|
+ LOG.info("Deleted the child node after the parent is serialized");
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error when deleting node {}", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void compareStat(String path, int sid, int compareWithSid) throws Exception{
|
|
|
+ Stat stat1 = new Stat();
|
|
|
+ zk[sid].getData(path, null, stat1);
|
|
|
+
|
|
|
+ Stat stat2 = new Stat();
|
|
|
+ zk[compareWithSid].getData(path, null, stat2);
|
|
|
+
|
|
|
+ Assert.assertEquals(stat1, stat2);
|
|
|
+ }
|
|
|
+
|
|
|
private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
|
|
|
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
|
|
|
}
|
|
@@ -174,6 +271,22 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
static class CustomDataTree extends DataTree {
|
|
|
Map<String, NodeCreateListener> nodeCreateListeners =
|
|
|
new HashMap<String, NodeCreateListener>();
|
|
|
+ Map<String, NodeSerializeListener> listeners =
|
|
|
+ new HashMap<String, NodeSerializeListener>();
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void serializeNodeData(OutputArchive oa, String path,
|
|
|
+ DataNode node) throws IOException {
|
|
|
+ super.serializeNodeData(oa, path, node);
|
|
|
+ NodeSerializeListener listener = listeners.get(path);
|
|
|
+ if (listener != null) {
|
|
|
+ listener.nodeSerialized(path);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void addListener(String path, NodeSerializeListener listener) {
|
|
|
+ listeners.put(path, listener);
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public void createNode(final String path, byte data[], List<ACL> acl,
|
|
@@ -193,6 +306,10 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static interface NodeSerializeListener {
|
|
|
+ public void nodeSerialized(String path);
|
|
|
+ }
|
|
|
+
|
|
|
static class CustomizedQPMain extends TestQPMain {
|
|
|
@Override
|
|
|
protected QuorumPeer getQuorumPeer() throws SaslException {
|