Browse Source

ZOOKEEPER-4394: Apply only committed requests in sync with leader before NEWLEADER ACK

Reviewers: kezhuw, anmolnar, kezhuw
Author: AlphaCanisMajoris
Closes #2152 from AlphaCanisMajoris/ZK-4643
AlphaCanisMajoris 7 months ago
parent
commit
9fcb805fa1

+ 1 - 14
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -793,19 +793,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
     }
 
 
     public synchronized void startup() {
     public synchronized void startup() {
-        startupWithServerState(State.RUNNING);
-    }
-
-    public synchronized void startupWithoutServing() {
-        startupWithServerState(State.INITIAL);
-    }
-
-    public synchronized void startServing() {
-        setState(State.RUNNING);
-        notifyAll();
-    }
-
-    private void startupWithServerState(State state) {
         if (sessionTracker == null) {
         if (sessionTracker == null) {
             createSessionTracker();
             createSessionTracker();
         }
         }
@@ -820,7 +807,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
 
         registerMetrics();
         registerMetrics();
 
 
-        setState(state);
+        setState(State.RUNNING);
 
 
         requestPathMetricsCollector.start();
         requestPathMetricsCollector.start();
 
 

+ 3 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -89,12 +89,11 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
      * @param hdr the txn header
      * @param hdr the txn header
      * @param txn the txn
      * @param txn the txn
      * @param digest the digest of txn
      * @param digest the digest of txn
-     * @return a request moving through a chain of RequestProcessors
      */
      */
-    public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
-        final Request request = buildRequestToProcess(hdr, txn, digest);
+    public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
+        final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
+        request.setTxnDigest(digest);
         getZKDatabase().append(request);
         getZKDatabase().append(request);
-        return request;
     }
     }
 
 
     /**
     /**

+ 48 - 32
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -555,8 +555,7 @@ public class Learner {
         boolean syncSnapshot = false;
         boolean syncSnapshot = false;
         readPacket(qp);
         readPacket(qp);
         Deque<Long> packetsCommitted = new ArrayDeque<>();
         Deque<Long> packetsCommitted = new ArrayDeque<>();
-        Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
-        Deque<Request> requestsToAck = new ArrayDeque<>();
+        Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();
 
 
         synchronized (zk) {
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
             if (qp.getType() == Leader.DIFF) {
@@ -645,11 +644,11 @@ public class Learner {
                         self.setLastSeenQuorumVerifier(qv, true);
                         self.setLastSeenQuorumVerifier(qv, true);
                     }
                     }
 
 
-                    packetsNotCommitted.add(pif);
+                    packetsNotLogged.add(pif);
                     break;
                     break;
                 case Leader.COMMIT:
                 case Leader.COMMIT:
                 case Leader.COMMITANDACTIVATE:
                 case Leader.COMMITANDACTIVATE:
-                    pif = packetsNotCommitted.peekFirst();
+                    pif = packetsNotLogged.peekFirst();
                     if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                     if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                         QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
                         QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
                         boolean majorChange = self.processReconfig(
                         boolean majorChange = self.processReconfig(
@@ -668,7 +667,7 @@ public class Learner {
                                 Long.toHexString(pif.hdr.getZxid()));
                                 Long.toHexString(pif.hdr.getZxid()));
                         } else {
                         } else {
                             zk.processTxn(pif.hdr, pif.rec);
                             zk.processTxn(pif.hdr, pif.rec);
-                            packetsNotCommitted.remove();
+                            packetsNotLogged.remove();
                         }
                         }
                     } else {
                     } else {
                         packetsCommitted.add(qp.getZxid());
                         packetsCommitted.add(qp.getZxid());
@@ -710,7 +709,7 @@ public class Learner {
                         // Apply to db directly if we haven't taken the snapshot
                         // Apply to db directly if we haven't taken the snapshot
                         zk.processTxn(packet.hdr, packet.rec);
                         zk.processTxn(packet.hdr, packet.rec);
                     } else {
                     } else {
-                        packetsNotCommitted.add(packet);
+                        packetsNotLogged.add(packet);
                         packetsCommitted.add(qp.getZxid());
                         packetsCommitted.add(qp.getZxid());
                     }
                     }
 
 
@@ -753,29 +752,55 @@ public class Learner {
                     isPreZAB1_0 = false;
                     isPreZAB1_0 = false;
 
 
                     // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
                     // ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
-                    sock.setSoTimeout(self.tickTime * self.syncLimit);
-                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
-                    zk.startupWithoutServing();
-                    if (zk instanceof FollowerZooKeeperServer) {
+                    if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) {
                         long startTime = Time.currentElapsedTime();
                         long startTime = Time.currentElapsedTime();
                         FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
                         FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
-                        for (PacketInFlight p : packetsNotCommitted) {
-                            final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
-                            requestsToAck.add(request);
+
+                        /*
+                         * @see https://github.com/apache/zookeeper/pull/1848
+                         * Persist and process the committed txns in "packetsNotLogged"
+                         * according to "packetsCommitted", which have been committed by
+                         * the leader. For these committed proposals, there is no need to
+                         * reply ack.
+                         *
+                         * @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394
+                         * Keep the outstanding proposals in "packetsNotLogged" to avoid
+                         * NullPointerException when the follower receives COMMIT packet(s)
+                         * right after replying NEWLEADER ack.
+                         */
+                        while (!packetsCommitted.isEmpty()) {
+                            long zxid = packetsCommitted.removeFirst();
+                            pif = packetsNotLogged.peekFirst();
+                            if (pif == null) {
+                                LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid));
+                                continue;
+                            } else if (pif.hdr.getZxid() != zxid) {
+                                LOG.warn("Committing 0x{}, but next proposal is 0x{}",
+                                        Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid()));
+                                continue;
+                            }
+                            packetsNotLogged.removeFirst();
+                            fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
+                            fzk.processTxn(pif.hdr, pif.rec);
                         }
                         }
 
 
-                        // persist the txns to disk
+                        // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
+                        // Make sure to persist the txns to disk before replying NEWLEADER ack.
                         fzk.getZKDatabase().commit();
                         fzk.getZKDatabase().commit();
-                        LOG.info("{} txns have been persisted and it took {}ms",
-                        packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
-                        packetsNotCommitted.clear();
+                        LOG.info("It took {}ms to persist and commit txns in packetsCommitted. "
+                                        + "{} outstanding txns left in packetsNotLogged",
+                                Time.currentElapsedTime() - startTime, packetsNotLogged.size());
                     }
                     }
 
 
-                    // set the current epoch after all the tnxs are persisted
+                    // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643
+                    // @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785
+                    // Update current epoch after the committed txns are persisted
                     self.setCurrentEpoch(newEpoch);
                     self.setCurrentEpoch(newEpoch);
                     LOG.info("Set the current epoch to {}", newEpoch);
                     LOG.info("Set the current epoch to {}", newEpoch);
+                    sock.setSoTimeout(self.tickTime * self.syncLimit);
+                    self.setSyncMode(QuorumPeer.SyncMode.NONE);
 
 
-                    // send NEWLEADER ack after all the tnxs are persisted
+                    // send NEWLEADER ack after the committed txns are persisted
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                     LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
                     LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
                     break;
                     break;
@@ -784,7 +809,7 @@ public class Learner {
         }
         }
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
         writePacket(ack, true);
         writePacket(ack, true);
-        zk.startServing();
+        zk.startup();
         /*
         /*
          * Update the election vote here to ensure that all members of the
          * Update the election vote here to ensure that all members of the
          * ensemble report the same vote to new servers that start up and
          * ensemble report the same vote to new servers that start up and
@@ -796,20 +821,11 @@ public class Learner {
 
 
         // We need to log the stuff that came in between the snapshot and the uptodate
         // We need to log the stuff that came in between the snapshot and the uptodate
         if (zk instanceof FollowerZooKeeperServer) {
         if (zk instanceof FollowerZooKeeperServer) {
-            // reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
-            // on waiting for a quorum of followers
-            for (final Request request : requestsToAck) {
-                final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
-                writePacket(ackPacket, false);
-            }
-            writePacket(null, true);
-            requestsToAck.clear();
-
             FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
             FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
-            for (PacketInFlight p : packetsNotCommitted) {
+            for (PacketInFlight p : packetsNotLogged) {
                 fzk.logRequest(p.hdr, p.rec, p.digest);
                 fzk.logRequest(p.hdr, p.rec, p.digest);
             }
             }
-            LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());
+            LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size());
 
 
             for (Long zxid : packetsCommitted) {
             for (Long zxid : packetsCommitted) {
                 fzk.commit(zxid);
                 fzk.commit(zxid);
@@ -819,7 +835,7 @@ public class Learner {
             // Similar to follower, we need to log requests between the snapshot
             // Similar to follower, we need to log requests between the snapshot
             // and UPTODATE
             // and UPTODATE
             ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
             ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
-            for (PacketInFlight p : packetsNotCommitted) {
+            for (PacketInFlight p : packetsNotLogged) {
                 Long zxid = packetsCommitted.peekFirst();
                 Long zxid = packetsCommitted.peekFirst();
                 if (p.hdr.getZxid() != zxid) {
                 if (p.hdr.getZxid() != zxid) {
                     // log warning message if there is no matching commit
                     // log warning message if there is no matching commit

+ 144 - 21
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java

@@ -741,8 +741,8 @@ public class Zab1_0Test extends ZKTestCase {
 
 
                     readPacketSkippingPing(ia, qp);
                     readPacketSkippingPing(ia, qp);
                     assertEquals(Leader.ACKEPOCH, qp.getType());
                     assertEquals(Leader.ACKEPOCH, qp.getType());
-                    assertEquals(0, qp.getZxid());
-                    assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
+                    assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid());
+                    assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt());
                     assertEquals(1, f.self.getAcceptedEpoch());
                     assertEquals(1, f.self.getAcceptedEpoch());
                     assertEquals(0, f.self.getCurrentEpoch());
                     assertEquals(0, f.self.getCurrentEpoch());
 
 
@@ -765,36 +765,22 @@ public class Zab1_0Test extends ZKTestCase {
                     qp.setZxid(0);
                     qp.setZxid(0);
                     oa.writeRecord(qp, null);
                     oa.writeRecord(qp, null);
 
 
-                    // Read the uptodate ack
-                    readPacketSkippingPing(ia, qp);
-                    assertEquals(Leader.ACK, qp.getType());
-                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
-
                     // Get the ack of the new leader
                     // Get the ack of the new leader
                     readPacketSkippingPing(ia, qp);
                     readPacketSkippingPing(ia, qp);
                     assertEquals(Leader.ACK, qp.getType());
                     assertEquals(Leader.ACK, qp.getType());
                     assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                     assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                     assertEquals(1, f.self.getAcceptedEpoch());
                     assertEquals(1, f.self.getAcceptedEpoch());
                     assertEquals(1, f.self.getCurrentEpoch());
                     assertEquals(1, f.self.getCurrentEpoch());
-
-                    //Wait for the transactions to be written out. The thread that writes them out
-                    // does not send anything back when it is done.
-                    long start = System.currentTimeMillis();
-                    while (createSessionZxid != f.fzk.getLastProcessedZxid()
-                                   && (System.currentTimeMillis() - start) < 50) {
-                        Thread.sleep(1);
-                    }
-
                     assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());
                     assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());
 
 
+                    // Read the uptodate ack
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACK, qp.getType());
+                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
                     // Make sure the data was recorded in the filesystem ok
                     // Make sure the data was recorded in the filesystem ok
                     ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
                     ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
-                    start = System.currentTimeMillis();
                     zkDb2.loadDataBase();
                     zkDb2.loadDataBase();
-                    while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
-                        Thread.sleep(1);
-                        zkDb2.loadDataBase();
-                    }
                     LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
                     LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
                     LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts());
                     LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts());
                     assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
                     assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
@@ -820,6 +806,143 @@ public class Zab1_0Test extends ZKTestCase {
         }, testData);
         }, testData);
     }
     }
 
 
+    @Test
+    public void testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File testData) throws Exception {
+        testFollowerConversation(new FollowerConversation() {
+            @Override
+            public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir", testData);
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
+                File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
+                try {
+                    assertEquals(0, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
+                    Stat stat = new Stat();
+                    assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.FOLLOWERINFO, qp.getType());
+                    assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
+                    assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    assertEquals(learnInfo.getServerid(), 0);
+
+                    // We are simulating an established leader, so the epoch is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte[] protoBytes = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACKEPOCH, qp.getType());
+                    assertEquals(0, qp.getZxid());
+                    assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
+                    assertEquals(1, f.self.getAcceptedEpoch());
+                    assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Send the snapshot we created earlier
+                    qp.setType(Leader.SNAP);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    zkDb.serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", null);
+                    Thread.sleep(10); //Give it some time to process the snap
+                    //No Snapshot taken yet, the SNAP was applied in memory
+                    verify(f.zk, never()).takeSnapshot();
+
+                    // Leader sends an outstanding proposal
+                    long proposalZxid = ZxidUtils.makeZxid(1, 1001);
+                    proposeSetData(qp, proposalZxid, "data2", 2);
+                    oa.writeRecord(qp, null);
+
+                    qp.setType(Leader.NEWLEADER);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    qp.setData(null);
+                    oa.writeRecord(qp, null);
+
+                    // Get the ack of the new leader
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACK, qp.getType());
+                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    assertEquals(1, f.self.getAcceptedEpoch());
+                    assertEquals(1, f.self.getCurrentEpoch());
+                    //Make sure that we did take the snapshot now
+                    verify(f.zk).takeSnapshot(true);
+                    assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
+
+                    // The outstanding proposal has not been persisted yet
+                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    long lastZxid = zkDb2.loadDataBase();
+                    assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
+                    assertEquals(firstZxid, lastZxid);
+
+                    TrackerWatcher watcher = new TrackerWatcher();
+
+                    // The change should not have happened yet
+                    assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
+
+                    // Leader commits proposalZxid right after it sends NEWLEADER to follower
+                    qp.setType(Leader.COMMIT);
+                    qp.setZxid(proposalZxid);
+                    qp.setData(null);
+                    oa.writeRecord(qp, null);
+
+                    qp.setType(Leader.UPTODATE);
+                    qp.setZxid(0);
+                    qp.setData(null);
+                    oa.writeRecord(qp, null);
+
+                    // Read the uptodate ack
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACK, qp.getType());
+                    assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+
+                    readPacketSkippingPing(ia, qp);
+                    assertEquals(Leader.ACK, qp.getType());
+                    assertEquals(proposalZxid, qp.getZxid());
+
+                    // The change should happen now
+                    watcher.waitForChange();
+                    assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
+
+                    // check and make sure the change is persisted
+                    zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    lastZxid = zkDb2.loadDataBase();
+                    assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
+                    assertEquals(proposalZxid, lastZxid);
+                } finally {
+                    TestUtils.deleteFileRecursively(tmpDir);
+                }
+            }
+
+            private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
+                qp.setType(Leader.PROPOSAL);
+                qp.setZxid(zxid);
+                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
+                SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                boa.writeRecord(hdr, null);
+                boa.writeRecord(sdt, null);
+                qp.setData(baos.toByteArray());
+            }
+        }, testData);
+    }
+
     @Test
     @Test
     public void testNormalRun(@TempDir File testData) throws Exception {
     public void testNormalRun(@TempDir File testData) throws Exception {
         testLeaderConversation(new LeaderConversation() {
         testLeaderConversation(new LeaderConversation() {