Jelajahi Sumber

ZOOKEEPER-3642: Fix potential data inconsistency due to DIFF sync after partial SNAP sync.

Based on https://github.com/apache/zookeeper/pull/1224 ; fixed unit test build issue.

Author: Fangmin Lyu <fangmin@apache.org>
Author: Michael Han <hanm@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Originally developed by Fangmin Lyu <fangmin@apache.org>

Closes #1515 from hanm/ZOOKEEPER-3642
Michael Han 4 tahun lalu
induk
melakukan
efbd660e1c

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -802,6 +802,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      */
     public synchronized void shutdown(boolean fullyShutDown) {
         if (!canShutdown()) {
+            if (fullyShutDown && zkDb != null) {
+                zkDb.clear();
+            }
             LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
             return;
         }

+ 3 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -859,7 +859,9 @@ public class Learner {
         closeSocket();
         // shutdown previous zookeeper
         if (zk != null) {
-            zk.shutdown();
+            // If we haven't finished SNAP sync, force fully shutdown
+            // to avoid potential inconsistency
+            zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));
         }
     }
 

+ 140 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

@@ -1615,11 +1615,139 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         }
     }
 
+    /**
+     * If learner failed to do SNAP sync with leader before it's writing
+     * the snapshot to disk, it's possible that it might have DIFF sync
+     * with new leader or itself being elected as a leader.
+     *
+     * This test is trying to guarantee there is no data inconsistency for
+     * this case.
+     */
+    @Test
+    public void testDiffSyncAfterSnap() throws Exception {
+        final int ENSEMBLE_SERVERS = 3;
+        MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+        ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];
+
+        try {
+            // 1. start a quorum
+            final int[] clientPorts = new int[ENSEMBLE_SERVERS];
+            StringBuilder sb = new StringBuilder();
+            String server;
+
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                clientPorts[i] = PortAssignment.unique();
+                server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                         + ":" + PortAssignment.unique()
+                         + ":participant;127.0.0.1:" + clientPorts[i];
+                sb.append(server + "\n");
+            }
+            String currentQuorumCfgSection = sb.toString();
+
+            // start servers
+            Context[] contexts = new Context[ENSEMBLE_SERVERS];
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                final Context context = new Context();
+                contexts[i] = context;
+                mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
+                    @Override
+                    public TestQPMain getTestQPMain() {
+                        return new CustomizedQPMain(context);
+                    }
+                };
+                mt[i].start();
+                zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+            }
+            waitForAll(zk, States.CONNECTED);
+            LOG.info("all servers started");
+
+            final String nodePath = "/testDiffSyncAfterSnap";
+
+            // 2. find leader and a follower
+            int leaderId = -1;
+            int followerA = -1;
+            for (int i = ENSEMBLE_SERVERS - 1; i >= 0; i--) {
+                if (mt[i].main.quorumPeer.leader != null) {
+                    leaderId = i;
+                } else if (followerA == -1) {
+                    followerA = i;
+                }
+            }
+
+            // 3. stop follower A
+            LOG.info("shutdown follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+
+            // 4. issue some traffic
+            int index = 0;
+            int numOfRequests = 10;
+            for (int i = 0; i < numOfRequests; i++) {
+                zk[leaderId].create(nodePath + index++,
+                   new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
+
+            // 5. inject fault to cause the follower exit when received NEWLEADER
+            contexts[followerA].newLeaderReceivedCallback = new NewLeaderReceivedCallback() {
+                boolean processed = false;
+                @Override
+                public void process() throws IOException {
+                    if (processed) {
+                        return;
+                    }
+                    processed = true;
+                    System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
+                    throw new IOException("read timedout");
+                }
+            };
+
+            // 6. force snap sync once
+            LOG.info("force snapshot sync");
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+            // 7. start follower A
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+            LOG.info("verify the nodes are exist in memory");
+            for (int i = 0; i < index; i++) {
+                assertNotNull(zk[followerA].exists(nodePath + i, false));
+            }
+
+            // 8. issue another request which will be persisted on disk
+            zk[leaderId].create(nodePath + index++,
+               new byte[1], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            // wait some time to let this get written to disk
+            Thread.sleep(500);
+
+            // 9. reload data from disk and make sure it's still consistent
+            LOG.info("restarting follower {}", followerA);
+            mt[followerA].shutdown();
+            waitForOne(zk[followerA], States.CONNECTING);
+            mt[followerA].start();
+            waitForOne(zk[followerA], States.CONNECTED);
+
+            for (int i = 0; i < index; i++) {
+                assertNotNull(zk[followerA].exists(nodePath + i, false), "node " + i + " should exist");
+            }
+
+        } finally {
+            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                mt[i].shutdown();
+                zk[i].close();
+            }
+        }
+    }
+
     static class Context {
 
         boolean quitFollowing = false;
         boolean exitWhenAckNewLeader = false;
         NewLeaderAckCallback newLeaderAckCallback = null;
+        NewLeaderReceivedCallback newLeaderReceivedCallback = null;
 
     }
 
@@ -1629,6 +1757,10 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
 
     }
 
+    interface NewLeaderReceivedCallback {
+        void process() throws IOException;
+    }
+
     interface StartForwardingListener {
 
         void start();
@@ -1702,6 +1834,14 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
                     }
                     super.writePacket(pp, flush);
                 }
+
+                @Override
+                void readPacket(QuorumPacket qp) throws IOException {
+                    super.readPacket(qp);
+                    if (qp.getType() == Leader.NEWLEADER && context.newLeaderReceivedCallback != null) {
+                        context.newLeaderReceivedCallback.process();
+                    }
+                }
             };
         }