Browse Source

ZOOKEEPER-4925: Fix data loss due to propagation of discontinuous committedLog (#2254)

There are two variants of `ZooKeeperServer::processTxn`. Those two
variants diverge significantly since ZOOKEEPER-3484.
`processTxn(Request request)` pops outstanding change from
`outstandingChanges` and adds txn to `committedLog` for follower to sync
in addition to what `processTxn(TxnHeader hdr, Record txn)` does. The
`Learner` uses `processTxn(TxnHeader hdr, Record txn)` to commit txn to
memory after ZOOKEEPER-4394, which means it leaves `committedLog`
untouched in `SYNCHRONIZATION` phase.

This way, a stale follower will have hole in its `committedLog` after
joining cluster. The stale follower will propagate the in memory hole
to other stale nodes after becoming leader. This causes data loss.

The test case fails on master and 3.9.3, and passes on 3.9.2. So only
3.9.3 is affected.

This commit drops `processTxn(TxnHeader hdr, Record txn)` as
`processTxn(Request request)` is capable in `SYNCHRONIZATION` phase too.

Also, this commit rejects discontinuous proposals in `syncWithLeader`
and `committedLog`, so to avoid possible data loss.

Refs: ZOOKEEPER-4925, ZOOKEEPER-4394, ZOOKEEPER-3484

Reviewers: li4wang
Author: kezhuw
Closes #2254 from kezhuw/ZOOKEEPER-4925-fix-data-loss
Kezhu Wang 1 month ago
parent
commit
e5dd60bf05

+ 13 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -78,6 +78,19 @@ public class Request {
         this.authInfo = null;
     }
 
+    public Request(TxnHeader hdr, Record txn, TxnDigest digest) {
+        this.sessionId = hdr.getClientId();
+        this.cxid = hdr.getCxid();
+        this.type = hdr.getType();
+        this.hdr = hdr;
+        this.txn = txn;
+        this.zxid = hdr.getZxid();
+        this.request = null;
+        this.cnxn = null;
+        this.authInfo = null;
+        this.txnDigest = digest;
+    }
+
     public final long sessionId;
 
     public final int cxid;

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/TxnLogEntry.java

@@ -47,4 +47,8 @@ public final class TxnLogEntry {
     public TxnDigest getDigest() {
         return digest;
     }
+
+    public Request toRequest() {
+        return new Request(header, txn, digest);
+    }
 }

+ 22 - 6
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

@@ -58,6 +58,7 @@ import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.Leader.PureRequestProposal;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
@@ -82,6 +83,8 @@ public class ZKDatabase {
     protected FileTxnSnapLog snapLog;
     protected long minCommittedLog, maxCommittedLog;
 
+    private final boolean allowDiscontinuousProposals = Boolean.getBoolean("zookeeper.test.allowDiscontinuousProposals");
+
     /**
      * Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot
      */
@@ -170,8 +173,6 @@ public class ZKDatabase {
      * data structures in zkdatabase.
      */
     public void clear() {
-        minCommittedLog = 0;
-        maxCommittedLog = 0;
         /* to be safe we just create a new
          * datatree.
          */
@@ -182,6 +183,8 @@ public class ZKDatabase {
         try {
             lock.lock();
             committedLog.clear();
+            minCommittedLog = 0;
+            maxCommittedLog = 0;
         } finally {
             lock.unlock();
         }
@@ -320,17 +323,30 @@ public class ZKDatabase {
         WriteLock wl = logLock.writeLock();
         try {
             wl.lock();
-            if (committedLog.size() > commitLogCount) {
-                committedLog.remove();
-                minCommittedLog = committedLog.peek().getZxid();
-            }
             if (committedLog.isEmpty()) {
                 minCommittedLog = request.zxid;
                 maxCommittedLog = request.zxid;
+            } else if (request.zxid <= maxCommittedLog) {
+                // This could happen if lastProcessedZxid is rewinded and database is re-synced.
+                // Currently, it only happens in test codes, but it should also be safe for production path.
+                return;
+            } else if (!allowDiscontinuousProposals
+                    && request.zxid != maxCommittedLog + 1
+                    && ZxidUtils.getEpochFromZxid(request.zxid) <= ZxidUtils.getEpochFromZxid(maxCommittedLog)) {
+                String msg = String.format(
+                    "Committed proposal cached out of order: 0x%s is not the next proposal of 0x%s",
+                    ZxidUtils.zxidToString(request.zxid),
+                    ZxidUtils.zxidToString(maxCommittedLog));
+                LOG.error(msg);
+                throw new IllegalStateException(msg);
             }
             PureRequestProposal p = new PureRequestProposal(request);
             committedLog.add(p);
             maxCommittedLog = p.getZxid();
+            if (committedLog.size() > commitLogCount) {
+                committedLog.remove();
+                minCommittedLog = committedLog.peek().getZxid();
+            }
         } finally {
             wl.unlock();
         }

+ 8 - 13
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -1846,13 +1846,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         cnxn.sendResponse(replyHeader, record, "response");
     }
 
-    // entry point for quorum/Learner.java
-    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
-        processTxnForSessionEvents(null, hdr, txn);
-        return processTxnInDB(hdr, txn, null);
-    }
-
-    // entry point for FinalRequestProcessor.java
     public ProcessTxnResult processTxn(Request request) {
         TxnHeader hdr = request.getHdr();
         processTxnForSessionEvents(request, hdr, request.getTxn());
@@ -1864,8 +1857,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         if (!writeRequest && !quorumRequest) {
             return new ProcessTxnResult();
         }
+
+        ProcessTxnResult rc;
         synchronized (outstandingChanges) {
-            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
+            rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
 
             // request.hdr is set for write requests, which are the only ones
             // that add to outstandingChanges.
@@ -1886,13 +1881,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                     }
                 }
             }
+        }
 
-            // do not add non quorum packets to the queue.
-            if (quorumRequest) {
-                getZKDatabase().addCommittedProposal(request);
-            }
-            return rc;
+        // do not add non quorum packets to the queue.
+        if (quorumRequest) {
+            getZKDatabase().addCommittedProposal(request);
         }
+        return rc;
     }
 
     private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {

+ 1 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

@@ -35,7 +35,6 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.txn.SetDataTxn;
-import org.apache.zookeeper.txn.TxnDigest;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -164,7 +163,6 @@ public class Follower extends Learner {
             TxnLogEntry logEntry = SerializeUtils.deserializeTxn(qp.getData());
             TxnHeader hdr = logEntry.getHeader();
             Record txn = logEntry.getTxn();
-            TxnDigest digest = logEntry.getDigest();
             if (hdr.getZxid() != lastQueued + 1) {
                 LOG.warn(
                     "Got zxid 0x{} expected 0x{}",
@@ -179,7 +177,7 @@ public class Follower extends Learner {
                 self.setLastSeenQuorumVerifier(qv, true);
             }
 
-            fzk.logRequest(hdr, txn, digest);
+            fzk.logRequest(logEntry.toRequest());
             if (hdr != null) {
                 /*
                  * Request header is created only by the leader, so this is only set

+ 6 - 28
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.management.JMException;
-import org.apache.jute.Record;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.server.ExitCode;
@@ -33,8 +32,6 @@ import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.txn.TxnDigest;
-import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,20 +76,17 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
 
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();
 
-    public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
-        final Request request = buildRequestToProcess(hdr, txn, digest);
+    public void logRequest(Request request) {
+        if ((request.zxid & 0xffffffffL) != 0) {
+            pendingTxns.add(request);
+        }
         syncProcessor.processRequest(request);
     }
 
     /**
-     * Build a request for the txn and append it to the transaction log
-     * @param hdr the txn header
-     * @param txn the txn
-     * @param digest the digest of txn
+     * Append txn request to the transaction log directly without go through request processors.
      */
-    public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
-        final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
-        request.setTxnDigest(digest);
+    public void appendRequest(Request request) throws IOException {
         getZKDatabase().append(request);
     }
 
@@ -188,20 +182,4 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
         rootContext.unregisterGauge("synced_observers");
 
     }
-
-    /**
-     * Build a request for the txn
-     * @param hdr the txn header
-     * @param txn the txn
-     * @param digest the digest of txn
-     * @return a request moving through a chain of RequestProcessors
-     */
-    private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
-        final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
-        request.setTxnDigest(digest);
-        if ((request.zxid & 0xffffffffL) != 0) {
-            pendingTxns.add(request);
-        }
-        return request;
-    }
 }

+ 36 - 22
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -82,6 +82,10 @@ public class Learner {
         Record rec;
         TxnDigest digest;
 
+        Request toRequest() {
+            return new Request(hdr, rec, digest);
+        }
+
     }
 
     QuorumPeer self;
@@ -535,6 +539,27 @@ public class Learner {
         }
     }
 
+    long enforceContinuousProposal(long lastQueued, PacketInFlight pif) throws Exception {
+        if (lastQueued == 0) {
+            LOG.info("DIFF sync got first proposal 0x{}", Long.toHexString(pif.hdr.getZxid()));
+        } else if (pif.hdr.getZxid() != lastQueued + 1) {
+            if (ZxidUtils.getEpochFromZxid(pif.hdr.getZxid()) <= ZxidUtils.getEpochFromZxid(lastQueued)) {
+                String msg = String.format(
+                    "DIFF sync got proposal 0x%s, last queued 0x%s, expected 0x%s",
+                    Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued),
+                    Long.toHexString(lastQueued + 1));
+                LOG.error(msg);
+                throw new Exception(msg);
+            }
+            // We can't tell whether it is a data loss. Given that new epoch is rare,
+            // log at warn should not be too verbose.
+            LOG.warn("DIFF sync got new epoch proposal 0x{}, last queued 0x{}, expected 0x{}",
+                Long.toHexString(pif.hdr.getZxid()), Long.toHexString(lastQueued),
+                Long.toHexString(lastQueued + 1));
+        }
+        return pif.hdr.getZxid();
+    }
+
     /**
      * Finally, synchronize our history with the Leader (if Follower)
      * or the LearnerMaster (if Observer).
@@ -609,6 +634,8 @@ public class Learner {
             zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
             zk.createSessionTracker();
 
+            // TODO: Ideally, this should be lastProcessZxid(a.k.a. QuorumPacket::zxid from above), but currently
+            // LearnerHandler does not guarantee this. So, let's be conservative and keep it unchange for now.
             long lastQueued = 0;
 
             // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
@@ -630,13 +657,7 @@ public class Learner {
                     pif.hdr = logEntry.getHeader();
                     pif.rec = logEntry.getTxn();
                     pif.digest = logEntry.getDigest();
-                    if (pif.hdr.getZxid() != lastQueued + 1) {
-                        LOG.warn(
-                            "Got zxid 0x{} expected 0x{}",
-                            Long.toHexString(pif.hdr.getZxid()),
-                            Long.toHexString(lastQueued + 1));
-                    }
-                    lastQueued = pif.hdr.getZxid();
+                    lastQueued = enforceContinuousProposal(lastQueued, pif);
 
                     if (pif.hdr.getType() == OpCode.reconfig) {
                         SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
@@ -666,7 +687,7 @@ public class Learner {
                                 Long.toHexString(qp.getZxid()),
                                 Long.toHexString(pif.hdr.getZxid()));
                         } else {
-                            zk.processTxn(pif.hdr, pif.rec);
+                            zk.processTxn(pif.toRequest());
                             packetsNotLogged.remove();
                         }
                     } else {
@@ -696,18 +717,11 @@ public class Learner {
                         packet.rec = logEntry.getTxn();
                         packet.hdr = logEntry.getHeader();
                         packet.digest = logEntry.getDigest();
-                        // Log warning message if txn comes out-of-order
-                        if (packet.hdr.getZxid() != lastQueued + 1) {
-                            LOG.warn(
-                                "Got zxid 0x{} expected 0x{}",
-                                Long.toHexString(packet.hdr.getZxid()),
-                                Long.toHexString(lastQueued + 1));
-                        }
-                        lastQueued = packet.hdr.getZxid();
+                        lastQueued = enforceContinuousProposal(lastQueued, packet);
                     }
                     if (!writeToTxnLog) {
                         // Apply to db directly if we haven't taken the snapshot
-                        zk.processTxn(packet.hdr, packet.rec);
+                        zk.processTxn(packet.toRequest());
                     } else {
                         packetsNotLogged.add(packet);
                         packetsCommitted.add(qp.getZxid());
@@ -780,8 +794,9 @@ public class Learner {
                                 continue;
                             }
                             packetsNotLogged.removeFirst();
-                            fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
-                            fzk.processTxn(pif.hdr, pif.rec);
+                            Request request = pif.toRequest();
+                            fzk.appendRequest(request);
+                            fzk.processTxn(request);
                         }
 
                         // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
@@ -823,7 +838,7 @@ public class Learner {
         if (zk instanceof FollowerZooKeeperServer) {
             FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
             for (PacketInFlight p : packetsNotLogged) {
-                fzk.logRequest(p.hdr, p.rec, p.digest);
+                fzk.logRequest(p.toRequest());
             }
             LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size());
 
@@ -847,8 +862,7 @@ public class Learner {
                     continue;
                 }
                 packetsCommitted.remove();
-                Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
-                request.setTxnDigest(p.digest);
+                Request request = p.toRequest();
                 ozk.commitRequest(request);
             }
         } else {

+ 2 - 9
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java

@@ -202,12 +202,8 @@ public class Observer extends Learner {
         case Leader.INFORM:
             ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
             logEntry = SerializeUtils.deserializeTxn(qp.getData());
-            hdr = logEntry.getHeader();
-            txn = logEntry.getTxn();
-            digest = logEntry.getDigest();
-            Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
+            Request request = logEntry.toRequest();
             request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
-            request.setTxnDigest(digest);
             ObserverZooKeeperServer obs = (ObserverZooKeeperServer) zk;
             obs.commitRequest(request);
             break;
@@ -219,13 +215,10 @@ public class Observer extends Learner {
             byte[] remainingdata = new byte[buffer.remaining()];
             buffer.get(remainingdata);
             logEntry = SerializeUtils.deserializeTxn(remainingdata);
-            hdr = logEntry.getHeader();
             txn = logEntry.getTxn();
-            digest = logEntry.getDigest();
             QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) txn).getData(), UTF_8));
 
-            request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
-            request.setTxnDigest(digest);
+            request = logEntry.toRequest();
             obs = (ObserverZooKeeperServer) zk;
 
             boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);

+ 2 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/TxnLogDigestTest.java

@@ -60,6 +60,7 @@ public class TxnLogDigestTest extends ClientBase {
 
     @BeforeEach
     public void setUp() throws Exception {
+        System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true");
         super.setUp();
         server = serverFactory.getZooKeeperServer();
         zk = createClient();
@@ -67,6 +68,7 @@ public class TxnLogDigestTest extends ClientBase {
 
     @AfterEach
     public void tearDown() throws Exception {
+        System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
         // server will be closed in super.tearDown
         super.tearDown();
 

+ 2 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java

@@ -60,6 +60,7 @@ public class ZxidRolloverTest extends ZKTestCase {
     @BeforeEach
     public void setUp() throws Exception {
         System.setProperty("zookeeper.admin.enableServer", "false");
+        System.setProperty("zookeeper.test.allowDiscontinuousProposals", "true");
 
         // set the snap count to something low so that we force log rollover
         // and verify that is working as part of the epoch rollover.
@@ -215,6 +216,7 @@ public class ZxidRolloverTest extends ZKTestCase {
 
     @AfterEach
     public void tearDown() throws Exception {
+        System.clearProperty("zookeeper.test.allowDiscontinuousProposals");
         LOG.info("tearDown starting");
         for (int i = 0; i < zkClients.length; i++) {
             zkClients[i].close();

+ 100 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSyncTest.java

@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.util.Comparator;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+public class QuorumSyncTest extends ZKTestCase {
+    private QuorumUtil qu;
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (qu != null) {
+            qu.shutdownAll();
+        }
+    }
+
+    @Test
+    public void testStaleDiffSync() throws Exception {
+        qu = new QuorumUtil(2);
+        qu.startAll();
+
+        int[] followerIds = qu.getFollowerQuorumPeers()
+            .stream()
+            .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed())
+            .mapToInt(peer -> (int) peer.getMyId()).toArray();
+
+        int follower1 = followerIds[0];
+        int follower2 = followerIds[1];
+
+        String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer());
+        try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) {
+            qu.shutdown(follower2);
+
+            for (int i = 0; i < 10; i++) {
+                zk.create("/foo" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            qu.shutdown(follower1);
+
+            for (int i = 0; i < 10; i++) {
+                zk.create("/bar" + i, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            qu.restart(follower1);
+        }
+
+        try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower1))) {
+            for (int i = 0; i < 10; i++) {
+                String path = "/foo" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+
+            for (int i = 0; i < 10; i++) {
+                String path = "/bar" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+        }
+
+        qu.shutdown(qu.getLeaderServer());
+
+        qu.restart(follower2);
+
+        try (ZooKeeper zk = ClientBase.createZKClient(qu.getConnectionStringForServer(follower2))) {
+            for (int i = 0; i < 10; i++) {
+                String path = "/foo" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+
+            for (int i = 0; i < 10; i++) {
+                String path = "/bar" + i;
+                assertNotNull(zk.exists(path, false), path + " not found");
+            }
+        }
+    }
+}