Jelajahi Sumber

ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs (master)

This is the master version of #157

Author: Robert (Bobby) Evans <evans@yahoo-inc.com>

Reviewers: Flavio Junqueira <fpj@apache.org>, Edward Ribeiro <edward.ribeiro@gmail.com>, Abraham Fine <afine@apache.org>, Michael Han <hanm@apache.org>

Closes #159 from revans2/ZOOKEEPER-2678-master and squashes the following commits:

69fbe19 [Robert (Bobby) Evans] ZOOKEEPER-2678: Addressed review comments
a432642 [Robert (Bobby) Evans] ZOOKEEPER-2678:  Improved test to verify snapshot times
742367e [Robert (Bobby) Evans] Addressed review comments
f4c5b0e [Robert (Bobby) Evans] ZOOKEEPER-2678: Discovery and Sync can take a very long time on large DBs
Robert (Bobby) Evans 8 tahun lalu
induk
melakukan
bbfd016941

+ 16 - 2
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -526,7 +526,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return state == State.RUNNING;
     }
 
-    public synchronized void shutdown() {
+    public void shutdown() {
+        shutdown(false);
+    }
+
+    /**
+     * Shut down the server instance
+     * @param fullyShutDown true if another server using the same database will not replace this one in the same process
+     */
+    public synchronized void shutdown(boolean fullyShutDown) {
         if (!canShutdown()) {
             LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
             return;
@@ -544,9 +552,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         if (firstProcessor != null) {
             firstProcessor.shutdown();
         }
-        if (zkDb != null) {
+
+        if (fullyShutDown && zkDb != null) {
             zkDb.clear();
         }
+        // else there is no need to clear the database
+        //  * When a new quorum is established we can still apply the diff
+        //    on top of the same zkDb data
+        //  * If we fetch a new snapshot from leader, the zkDb will be
+        //    cleared anyway before loading the snapshot
 
         unregisterJMX();
     }

+ 1 - 1
src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java

@@ -167,7 +167,7 @@ public class ZooKeeperServerMain {
                 secureCnxnFactory.join();
             }
             if (zkServer.canShutdown()) {
-                zkServer.shutdown();
+                zkServer.shutdown(true);
             }
         } catch (InterruptedException e) {
             // warn, but generally this is ok

+ 24 - 13
src/java/main/org/apache/zookeeper/server/quorum/Learner.java

@@ -358,12 +358,16 @@ public class Learner {
         
         QuorumVerifier newLeaderQV = null;
         
-        readPacket(qp);   
+        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
+        // For SNAP and TRUNC the snapshot is needed to save that history
+        boolean snapshotNeeded = true;
+        readPacket(qp);
         LinkedList<Long> packetsCommitted = new LinkedList<Long>();
         LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
         synchronized (zk) {
             if (qp.getType() == Leader.DIFF) {
-                LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));                
+                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
+                snapshotNeeded = false;
             }
             else if (qp.getType() == Leader.SNAP) {
                 LOG.info("Getting a snapshot from leader");
@@ -400,10 +404,13 @@ public class Learner {
             
             long lastQueued = 0;
 
-            // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
-            // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
+            // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
+            // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
             // we need to make sure that we don't take the snapshot twice.
-            boolean snapshotTaken = false;
+            boolean isPreZAB1_0 = true;
+            //If we are not going to take the snapshot be sure the transactions are not applied in memory
+            // but written out to the transaction log
+            boolean writeToTxnLog = !snapshotNeeded;
             // we are now going to start getting transactions to apply followed by an UPTODATE
             outerLoop:
             while (self.isRunning()) {
@@ -440,7 +447,7 @@ public class Learner {
                             throw new Exception("changes proposed in reconfig");
                         }
                     }
-                    if (!snapshotTaken) {
+                    if (!writeToTxnLog) {
                         if (pif.hdr.getZxid() != qp.getZxid()) {
                             LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                         } else {
@@ -479,8 +486,7 @@ public class Learner {
                         }
                         lastQueued = packet.hdr.getZxid();
                     }
-
-                    if (!snapshotTaken) {
+                    if (!writeToTxnLog) {
                         // Apply to db directly if we haven't taken the snapshot
                         zk.processTxn(packet.hdr, packet.rec);
                     } else {
@@ -498,14 +504,15 @@ public class Learner {
                            throw new Exception("changes proposed in reconfig");
                        }
                     }
-                    if (!snapshotTaken) { // true for the pre v1.0 case
-                       zk.takeSnapshot();
+                    if (isPreZAB1_0) {
+                        zk.takeSnapshot();
                         self.setCurrentEpoch(newEpoch);
                     }
                     self.setZooKeeperServer(zk);
                     self.adminServer.setZooKeeperServer(zk);
                     break outerLoop;
-                case Leader.NEWLEADER: // it will be NEWLEADER in v1.0        
+                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
+                    // means this is Zab 1.0
                    LOG.info("Learner received NEWLEADER message");
                    if (qp.getData()!=null && qp.getData().length > 1) {
                        try {                       
@@ -516,10 +523,14 @@ public class Learner {
                            e.printStackTrace();
                        }
                    }
+
+                   if (snapshotNeeded) {
+                       zk.takeSnapshot();
+                   }
                    
-                    zk.takeSnapshot();
                     self.setCurrentEpoch(newEpoch);
-                    snapshotTaken = true;
+                    writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
+                    isPreZAB1_0 = false;
                     writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                     break;
                 }

+ 28 - 1
src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

@@ -19,6 +19,9 @@
 package org.apache.zookeeper.server.quorum;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
@@ -652,6 +655,8 @@ public class Zab1_0Test extends ZKTestCase {
                 tmpDir.mkdir();
                 File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
                 File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
                 try {
                     Assert.assertEquals(0, f.self.getAcceptedEpoch());
                     Assert.assertEquals(0, f.self.getCurrentEpoch());
@@ -694,6 +699,10 @@ public class Zab1_0Test extends ZKTestCase {
                     oa.writeRecord(qp, null);
                     zkDb.serializeSnapshot(oa);
                     oa.writeString("BenWasHere", null);
+                    Thread.sleep(10); //Give it some time to process the snap
+                    //No Snapshot taken yet, the SNAP was applied in memory
+                    verify(f.zk, never()).takeSnapshot();
+
                     qp.setType(Leader.NEWLEADER);
                     qp.setZxid(ZxidUtils.makeZxid(1, 0));
                     oa.writeRecord(qp, null);
@@ -704,7 +713,8 @@ public class Zab1_0Test extends ZKTestCase {
                     Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
                     Assert.assertEquals(1, f.self.getAcceptedEpoch());
                     Assert.assertEquals(1, f.self.getCurrentEpoch());
-                    
+                    //Make sure that we did take the snapshot now
+                    verify(f.zk).takeSnapshot();
                     Assert.assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
                     
                     // Make sure the data was recorded in the filesystem ok
@@ -780,6 +790,8 @@ public class Zab1_0Test extends ZKTestCase {
                 tmpDir.mkdir();
                 File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
                 File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                //Spy on ZK so we can check if a snapshot happened or not.
+                f.zk = spy(f.zk);
                 try {
                     Assert.assertEquals(0, f.self.getAcceptedEpoch());
                     Assert.assertEquals(0, f.self.getCurrentEpoch());
@@ -847,13 +859,28 @@ public class Zab1_0Test extends ZKTestCase {
                     Assert.assertEquals(1, f.self.getAcceptedEpoch());
                     Assert.assertEquals(1, f.self.getCurrentEpoch());
                     
+                    //Wait for the transactions to be written out. The thread that writes them out
+                    // does not send anything back when it is done.
+                    long start = System.currentTimeMillis();
+                    while (createSessionZxid != f.fzk.getLastProcessedZxid() && (System.currentTimeMillis() - start) < 50) {
+                        Thread.sleep(1);
+                    }
+                    
                     Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());
                     
                     // Make sure the data was recorded in the filesystem ok
                     ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    start = System.currentTimeMillis();
                     zkDb2.loadDataBase();
+                    while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
+                        Thread.sleep(1);
+                        zkDb2.loadDataBase();
+                    }
                     LOG.info("zkdb2 sessions:" + zkDb2.getSessions());
+                    LOG.info("zkdb2 with timeouts:" + zkDb2.getSessionWithTimeOuts());
                     Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
+                    //Snapshot was never taken during very simple sync
+                    verify(f.zk, never()).takeSnapshot();
                 } finally {
                     TestUtils.deleteFileRecursively(tmpDir);
                 }