Ver código fonte

ZOOKEEPER-4785: Txn loss due to race condition in Learner.syncWithLeader() during DIFF sync (#2111)

Author: Li Wang <liwang@apple.com>

Co-authored-by: liwang <liwang@apple.com>
li4wang 1 ano atrás
pai
commit
315abde35e

+ 8 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FastLeaderElection.java

@@ -702,14 +702,14 @@ public class FastLeaderElection implements Election {
                 qv.toString().getBytes(UTF_8));
 
             LOG.debug(
-                "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
-                    + " {} (myid), 0x{} (n.peerEpoch) ",
+                "Sending Notification: {} (n.leader), 0x{} (n.peerEpoch), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+                    + " {} (myid) ",
                 proposedLeader,
                 Long.toHexString(proposedZxid),
+                Long.toHexString(proposedEpoch),
                 Long.toHexString(logicalclock.get()),
                 sid,
-                self.getMyId(),
-                Long.toHexString(proposedEpoch));
+                self.getMyId());
 
             sendqueue.offer(notmsg);
         }
@@ -722,12 +722,13 @@ public class FastLeaderElection implements Election {
      */
     protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
         LOG.debug(
-            "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
+            "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}, epoch: 0x{}, proposed epoch: 0x{}",
             newId,
             curId,
             Long.toHexString(newZxid),
-            Long.toHexString(curZxid));
-
+            Long.toHexString(curZxid),
+            Long.toHexString(newEpoch),
+            Long.toHexString(curEpoch));
         if (self.getQuorumVerifier().getWeight(newId) == 0) {
             return false;
         }

+ 29 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -80,14 +80,23 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<>();
 
     public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
-        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
-        request.setTxnDigest(digest);
-        if ((request.zxid & 0xffffffffL) != 0) {
-            pendingTxns.add(request);
-        }
+        final Request request = buildRequestToProcess(hdr, txn, digest);
         syncProcessor.processRequest(request);
     }
 
+    /**
+     * Build a request for the txn and append it to the transaction log
+     * @param hdr the txn header
+     * @param txn the txn
+     * @param digest the digest of txn
+     * @return a request moving through a chain of RequestProcessors
+     */
+    public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
+        final Request request = buildRequestToProcess(hdr, txn, digest);
+        getZKDatabase().append(request);
+        return request;
+    }
+
     /**
      * When a COMMIT message is received, eventually this method is called,
      * which matches up the zxid from the COMMIT with (hopefully) the head of
@@ -181,4 +190,19 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
 
     }
 
+    /**
+     * Build a request for the txn
+     * @param hdr the txn header
+     * @param txn the txn
+     * @param digest the digest of txn
+     * @return a request moving through a chain of RequestProcessors
+     */
+    private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
+        final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
+        request.setTxnDigest(digest);
+        if ((request.zxid & 0xffffffffL) != 0) {
+            pendingTxns.add(request);
+        }
+        return request;
+    }
 }

+ 29 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -556,6 +556,8 @@ public class Learner {
         readPacket(qp);
         Deque<Long> packetsCommitted = new ArrayDeque<>();
         Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
+        Deque<Request> requestsToAck = new ArrayDeque<>();
+
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
                 LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
@@ -745,7 +747,7 @@ public class Learner {
                         zk.takeSnapshot(syncSnapshot);
                     }
 
-                    self.setCurrentEpoch(newEpoch);
+
                     writeToTxnLog = true;
                     //Anything after this needs to go to the transaction log, not applied directly in memory
                     isPreZAB1_0 = false;
@@ -755,14 +757,27 @@ public class Learner {
                     self.setSyncMode(QuorumPeer.SyncMode.NONE);
                     zk.startupWithoutServing();
                     if (zk instanceof FollowerZooKeeperServer) {
+                        long startTime = Time.currentElapsedTime();
                         FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
                         for (PacketInFlight p : packetsNotCommitted) {
-                            fzk.logRequest(p.hdr, p.rec, p.digest);
+                            final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
+                            requestsToAck.add(request);
                         }
+
+                        // persist the txns to disk
+                        fzk.getZKDatabase().commit();
+                        LOG.info("{} txns have been persisted and it took {}ms",
+                        packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
                         packetsNotCommitted.clear();
                     }
 
+                    // set the current epoch after all the tnxs are persisted
+                    self.setCurrentEpoch(newEpoch);
+                    LOG.info("Set the current epoch to {}", newEpoch);
+
+                    // send NEWLEADER ack after all the tnxs are persisted
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
+                    LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
                     break;
                 }
             }
@@ -781,13 +796,25 @@ public class Learner {
 
         // We need to log the stuff that came in between the snapshot and the uptodate
         if (zk instanceof FollowerZooKeeperServer) {
+            // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
+            // on waiting for a quorum of followers
+            for (final Request request : requestsToAck) {
+                final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
+                writePacket(ackPacket, false);
+            }
+            writePacket(null, true);
+            requestsToAck.clear();
+
             FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
             for (PacketInFlight p : packetsNotCommitted) {
                 fzk.logRequest(p.hdr, p.rec, p.digest);
             }
+            LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());
+
             for (Long zxid : packetsCommitted) {
                 fzk.commit(zxid);
             }
+            LOG.info("{} txns have been committed", packetsCommitted.size());
         } else if (zk instanceof ObserverZooKeeperServer) {
             // Similar to follower, we need to log requests between the snapshot
             // and UPTODATE

+ 316 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java

@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.security.sasl.SaslException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerListener;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+public class DIFFSyncTest extends QuorumPeerTestBase {
+    private static final int SERVER_COUNT = 3;
+    private static final String PATH_PREFIX = "/test_";
+
+    private int[] clientPorts;
+    private MainThread[] mt;
+    private ZooKeeper[] zkClients;
+
+    @BeforeEach
+    public void start() throws Exception {
+        clientPorts = new int[SERVER_COUNT];
+        mt = startQuorum(clientPorts);
+        zkClients = new ZooKeeper[SERVER_COUNT];
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception{
+        for (final ZooKeeper zk : zkClients) {
+            try {
+                if (zk != null) {
+                    zk.close();
+                }
+            } catch (final InterruptedException e) {
+                LOG.warn("ZooKeeper interrupted while shutting it down", e);
+            }
+        }
+
+        for (final MainThread mainThread : mt) {
+            try {
+                mainThread.shutdown();
+            } catch (final InterruptedException e) {
+                LOG.warn("Quorum Peer interrupted while shutting it down", e);
+            }
+        }
+    }
+
+    @Test
+    @Timeout(value = 120)
+    public void testTxnLoss_FailToPersistAndCommitTxns() throws Exception {
+        final List<String> paths = new ArrayList<>();
+        assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
+
+        // create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>)
+        createZKClient(2);
+
+        // create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
+        paths.add(createNode(zkClients[2], PATH_PREFIX + "0"));
+
+        // shut down S0
+        mt[0].shutdown();
+        LOG.info("S0 shutdown.");
+
+        // create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind
+        paths.add(createNode(zkClients[2], PATH_PREFIX + "1"));
+        logEpochsAndLastLoggedTxnForAllServers();
+
+        // shut down S1
+        mt[1].shutdown();
+        LOG.info("S1 shutdown.");
+
+        // restart S0 and trigger a new leader election (currentEpoch=2)
+        // S0 starts with MockSyncRequestProcessor and MockCommitProcessor to simulate it writes the
+        // currentEpoch and sends NEWLEADER ACK but fails to persist and commit txns afterwards
+        // in DIFF sync
+        mt[0].start(new MockTestQPMain());
+        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT),
+                "waiting for server 0 being up");
+        LOG.info("S0 restarted.");
+        logEpochsAndLastLoggedTxnForAllServers();
+
+        // validate S2 is still the leader
+        assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
+
+        // shut down the leader (i.e. S2). This causes S0 disconnects from leader, performs partial
+        // shutdown, fast forwards its database to the latest persisted tnx (i.e. <1, 3>) and change
+        // its state to LOOKING
+        mt[2].shutdown();
+        LOG.info("S2 shutdown.");
+
+        // start S1 and trigger a leader election (currentEpoch=3)
+        mt[1].start();
+        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1], CONNECTION_TIMEOUT),
+                "waiting for server 1 being up");
+        LOG.info("S1 restarted.");
+        logEpochsAndLastLoggedTxnForAllServers();
+
+        // validate S0 is the new leader because of it has higher epoch
+        assertEquals(0, mt[0].getQuorumPeer().getLeaderId());
+
+        // connect to the new leader (i.e. S0) (currentEpoch=3, lastLoggedZxid=<3, 1>
+        createZKClient(0);
+
+        // create a znode (currentEpoch=3, lastLoggedZxid=<3, 2>)
+        paths.add(createNode(zkClients[0], PATH_PREFIX + "3"));
+
+        // start S2 which is the old leader
+        mt[2].start();
+        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2], CONNECTION_TIMEOUT),
+                "waiting for server " + 2 + " being up");
+        LOG.info("S2 restarted.");
+        logEpochsAndLastLoggedTxnForAllServers();
+
+        // validate all the znodes exist from all the clients
+        validateDataFromAllClients(paths);
+    }
+
+    @Test
+    @Timeout(value = 120)
+    public void testLeaderShutdown_AckProposalBeforeAckNewLeader() throws Exception {
+        assertEquals(2, mt[2].getQuorumPeer().getLeaderId());
+
+        // create a ZK client to the leader (currentEpoch=1, lastLoggedZxid=<1, 1>)
+        createZKClient(2);
+
+        // create a znode (currentEpoch=1, lastLoggedZxid=<1, 2>)
+        createNode(zkClients[2], PATH_PREFIX + "0");
+
+        // shut down S0
+        mt[0].shutdown();
+        LOG.info("S0 shutdown.");
+
+        // create a znode (currentEpoch=1, lastLoggedZxid=<1, 3>), so S0 is 1 txn behind
+        createNode(zkClients[2], PATH_PREFIX + "1");
+        logEpochsAndLastLoggedTxnForAllServers();
+
+        // shut down S1
+        mt[1].shutdown();
+        LOG.info("S1 shutdown.");
+
+        // restart S0 and trigger a new leader election and DIFF sync (currentEpoch=2)
+        mt[0].start();
+        assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0], CONNECTION_TIMEOUT),
+                "waiting for server 0 being up");
+        LOG.info("S0 restarted.");
+
+        // create a znode (currentEpoch=2, lastLoggedZxid=<2, 1>)
+        createNode(zkClients[2], PATH_PREFIX + "2");
+
+        // validate quorum is up without additional round of leader election
+        for (int  i = 0; i < SERVER_COUNT; i++) {
+            if (i != 1) {
+                final QuorumPeer qp = mt[i].getQuorumPeer();
+                assertNotNull(qp);
+                assertEquals(2, qp.getCurrentEpoch());
+                assertEquals(2, qp.getAcceptedEpoch());
+                assertEquals("200000001", Long.toHexString(qp.getLastLoggedZxid()));
+            }
+        }
+    }
+
+    private MainThread[] startQuorum(final int[] clientPorts) throws IOException {
+        final StringBuilder sb = new StringBuilder();
+        String server;
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+                    + ":participant;127.0.0.1:" + clientPorts[i];
+            sb.append(server);
+            sb.append("\n");
+        }
+
+        final MainThread[] mt = new MainThread[SERVER_COUNT];
+
+        // start all the servers
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], sb.toString(), false);
+            mt[i].start();
+        }
+
+        // ensure all servers started
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            assertTrue(
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
+                    "waiting for server " + i + " being up");
+        }
+        return mt;
+    }
+
+    private void createZKClient(final int idx) throws Exception {
+        zkClients[idx] = null;
+        final ClientBase.CountdownWatcher watch = new ClientBase.CountdownWatcher();
+        zkClients[idx] = new ZooKeeper("127.0.0.1:" + clientPorts[idx], ClientBase.CONNECTION_TIMEOUT, watch);
+        watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+    }
+
+    private String createNode(final ZooKeeper zk, final String path) throws Exception {
+        final String fullPath = zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        assertNotNull(zk.exists(path, false));
+        return fullPath;
+    }
+
+    private static class MockTestQPMain extends TestQPMain {
+        @Override
+        protected QuorumPeer getQuorumPeer() throws SaslException {
+            return new TestQuorumPeer();
+        }
+    }
+
+    private static class TestQuorumPeer extends QuorumPeer {
+        public TestQuorumPeer() throws SaslException {
+        }
+
+        @Override
+        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+            final FollowerZooKeeperServer followerZookeeperServer = new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
+                @Override
+                protected void setupRequestProcessors() {
+                    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+                    commitProcessor = new MockCommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
+                    commitProcessor.start();
+
+                    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
+                    ((FollowerRequestProcessor) firstProcessor).start();
+                    syncProcessor = new MockSyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
+
+                    syncProcessor.start();
+                }
+            };
+            return new Follower(this, followerZookeeperServer);
+        }
+    }
+
+    private static class MockSyncRequestProcessor extends SyncRequestProcessor {
+        public MockSyncRequestProcessor(final ZooKeeperServer zks, final RequestProcessor nextProcessor) {
+            super(zks, nextProcessor);
+        }
+
+        @Override
+        public void processRequest(final Request request) {
+            LOG.info("Sync request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid()));
+        }
+    }
+
+    private static class MockCommitProcessor extends CommitProcessor {
+        public MockCommitProcessor(final RequestProcessor nextProcessor, final String id,
+                                   final boolean matchSyncs, final ZooKeeperServerListener listener) {
+
+            super(nextProcessor, id, matchSyncs, listener);
+        }
+
+        @Override
+        public void commit(final Request request) {
+            LOG.info("Commit request for zxid {} is dropped", Long.toHexString(request.getHdr().getZxid()));
+        }
+    }
+
+    private void logEpochsAndLastLoggedTxnForAllServers() throws Exception {
+        for (int  i = 0; i < SERVER_COUNT; i++) {
+            final QuorumPeer qp = mt[i].getQuorumPeer();
+            if (qp != null) {
+                LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s",
+                        qp.getMyId(), qp.getAcceptedEpoch(),
+                        qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid())));
+            }
+        }
+    }
+
+    private void validateDataFromAllClients(final List<String> paths) throws Exception{
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            if (zkClients[i] == null) {
+                createZKClient(i);
+            }
+
+            for (final String path : paths) {
+                assertNotNull(zkClients[i].exists(path, false), "znode " + path + " is missing");
+            }
+            assertEquals(3, paths.size());
+        }
+    }
+}

+ 12 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java

@@ -318,6 +318,18 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             currentThread.start();
         }
 
+        /**
+         * start the QuorumPeer with the passed TestQPMain
+         *
+         * @param testQPMain the TestQPMain to use
+         */
+
+        public synchronized void start(final TestQPMain testQPMain) {
+            main = testQPMain;
+            currentThread = new Thread(this);
+            currentThread.start();
+        }
+
         public TestQPMain getTestQPMain() {
             return new TestQPMain();
         }