Explorar o código

ZOOKEEPER-4712: Fix partially shutdown of ZooKeeperServer and its processors

Reviewers: anmolnar, kezhuw, kezhuw, kezhuw
Author: jonmv
Closes #2154 from jonmv/jonmv/ZOOKEEPER-4541-take-2
Jon Marius Venstad hai 7 meses
pai
achega
bc9afbf8ef

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

@@ -294,7 +294,7 @@ public class ZKDatabase {
     }
 
     /**
-     * Fast forward the database adding transactions from the committed log into memory.
+     * Fast-forward the database adding transactions from the committed log into memory.
      * @return the last valid zxid.
      * @throws IOException
      */

+ 36 - 34
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -900,7 +900,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * @return true if the server is running or server hits an error, false
      *         otherwise.
      */
-    protected boolean canShutdown() {
+    private boolean canShutdown() {
         return state == State.RUNNING || state == State.ERROR;
     }
 
@@ -911,27 +911,49 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return state == State.RUNNING;
     }
 
-    public void shutdown() {
+    public final void shutdown() {
         shutdown(false);
     }
 
     /**
      * Shut down the server instance
-     * @param fullyShutDown true if another server using the same database will not replace this one in the same process
+     * @param fullyShutDown true when no other server will use the same database to replace this one
      */
-    public synchronized void shutdown(boolean fullyShutDown) {
-        if (!canShutdown()) {
-            if (fullyShutDown && zkDb != null) {
-                zkDb.clear();
+    public final synchronized void shutdown(boolean fullyShutDown) {
+        if (canShutdown()) {
+            LOG.info("Shutting down");
+
+            shutdownComponents();
+
+            if (zkDb != null && !fullyShutDown) {
+                // There is no need to clear the database if we are going to reuse it:
+                //  * When a new quorum is established we can still apply the diff
+                //    on top of the same zkDb data
+                //  * If we fetch a new snapshot from leader, the zkDb will be
+                //    cleared anyway before loading the snapshot
+                try {
+                    // This will fast-forward the database to the last recorded transaction
+                    zkDb.fastForwardDataBase();
+                } catch (IOException e) {
+                    LOG.error("Error updating DB", e);
+                    fullyShutDown = true;
+                }
             }
+            setState(State.SHUTDOWN);
+        } else {
             LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
-            return;
         }
-        LOG.info("shutting down");
-
-        // new RuntimeException("Calling shutdown").printStackTrace();
-        setState(State.SHUTDOWN);
+        if (zkDb != null && fullyShutDown) {
+            zkDb.clear();
+        }
+    }
 
+    /**
+     * @implNote
+     * Shuts down components owned by this class;
+     * remember to call super.shutdownComponents() when overriding!
+     */
+    protected void shutdownComponents() {
         // unregister all metrics that are keeping a strong reference to this object
         // subclasses will do their specific clean up
         unregisterMetrics();
@@ -940,9 +962,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             requestThrottler.shutdown();
         }
 
-        // Since sessionTracker and syncThreads poll we just have to
-        // set running to false and they will detect it during the poll
-        // interval.
+        // Since sessionTracker and syncThreads poll we just have to set running to false,
+        // and they will detect it during the poll interval.
         if (sessionTracker != null) {
             sessionTracker.shutdown();
         }
@@ -953,25 +974,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             jvmPauseMonitor.serviceStop();
         }
 
-        if (zkDb != null) {
-            if (fullyShutDown) {
-                zkDb.clear();
-            } else {
-                // else there is no need to clear the database
-                //  * When a new quorum is established we can still apply the diff
-                //    on top of the same zkDb data
-                //  * If we fetch a new snapshot from leader, the zkDb will be
-                //    cleared anyway before loading the snapshot
-                try {
-                    //This will fast forward the database to the latest recorded transactions
-                    zkDb.fastForwardDataBase();
-                } catch (IOException e) {
-                    LOG.error("Error updating DB", e);
-                    zkDb.clear();
-                }
-            }
-        }
-
         requestPathMetricsCollector.shutdown();
         unregisterJMX();
     }

+ 1 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java

@@ -192,9 +192,7 @@ public class ZooKeeperServerMain {
             if (secureCnxnFactory != null) {
                 secureCnxnFactory.join();
             }
-            if (zkServer.canShutdown()) {
-                zkServer.shutdown(true);
-            }
+            zkServer.shutdown(true);
         } catch (InterruptedException e) {
             // warn, but generally this is ok
             LOG.warn("Server interrupted", e);

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java

@@ -313,7 +313,7 @@ public class FileTxnSnapLog {
     }
 
     /**
-     * This function will fast forward the server database to have the latest
+     * This function will fast-forward the server database to have the latest
      * transactions in it.  This is the same as restore, but only reads from
      * the transaction logs and not restores from a snapshot.
      * @param dt the datatree to write transactions to.

+ 2 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -155,11 +155,11 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
     }
 
     @Override
-    public synchronized void shutdown() {
+    protected synchronized void shutdownComponents() {
         if (containerManager != null) {
             containerManager.stop();
         }
-        super.shutdown();
+        super.shutdownComponents();
     }
 
     @Override

+ 14 - 14
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -916,26 +916,26 @@ public class Learner {
     }
 
     void closeSocket() {
-        if (sock != null) {
-            if (sockBeingClosed.compareAndSet(false, true)) {
-                if (closeSocketAsync) {
-                    final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + zk.getServerId());
-                    closingThread.setDaemon(true);
-                    closingThread.start();
-                } else {
-                    closeSockSync();
-                }
+        if (sockBeingClosed.compareAndSet(false, true)) {
+            if (sock == null) { // Closing before establishing the connection is a noop
+                return;
+            }
+            Socket socket = sock;
+            sock = null;
+            if (closeSocketAsync) {
+                final Thread closingThread = new Thread(() -> closeSockSync(socket), "CloseSocketThread(sid:" + zk.getServerId());
+                closingThread.setDaemon(true);
+                closingThread.start();
+            } else {
+                closeSockSync(socket);
             }
         }
     }
 
-    void closeSockSync() {
+    private static void closeSockSync(Socket socket) {
         try {
             long startTime = Time.currentElapsedTime();
-            if (sock != null) {
-                sock.close();
-                sock = null;
-            }
+            socket.close();
             ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime);
         } catch (IOException e) {
             LOG.warn("Ignoring error closing connection to leader", e);

+ 6 - 11
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java

@@ -152,17 +152,7 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
     }
 
     @Override
-    public synchronized void shutdown() {
-        if (!canShutdown()) {
-            LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
-            return;
-        }
-        LOG.info("Shutting down");
-        try {
-            super.shutdown();
-        } catch (Exception e) {
-            LOG.warn("Ignoring unexpected exception during shutdown", e);
-        }
+    protected void shutdownComponents() {
         try {
             if (syncProcessor != null) {
                 syncProcessor.shutdown();
@@ -170,6 +160,11 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
         } catch (Exception e) {
             LOG.warn("Ignoring unexpected exception in syncprocessor shutdown", e);
         }
+        try {
+            super.shutdownComponents();
+        } catch (Exception e) {
+            LOG.warn("Ignoring unexpected exception during shutdown", e);
+        }
     }
 
 }

+ 1 - 13
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverZooKeeperServer.java

@@ -44,7 +44,7 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
      * take periodic snapshot. Default is ON.
      */
 
-    private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
+    private final boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
 
     /*
      * Pending sync requests
@@ -127,18 +127,6 @@ public class ObserverZooKeeperServer extends LearnerZooKeeperServer {
         return "observer";
     }
 
-    @Override
-    public synchronized void shutdown() {
-        if (!canShutdown()) {
-            LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
-            return;
-        }
-        super.shutdown();
-        if (syncRequestProcessorEnabled && syncProcessor != null) {
-            syncProcessor.shutdown();
-        }
-    }
-
     @Override
     public void dumpMonitorValues(BiConsumer<String, Object> response) {
         super.dumpMonitorValues(response);

+ 2 - 6
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java

@@ -190,11 +190,7 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
     }
 
     @Override
-    public synchronized void shutdown() {
-        if (!canShutdown()) {
-            LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
-            return;
-        }
+    protected void shutdownComponents() {
         shutdown = true;
         unregisterJMX(this);
 
@@ -206,7 +202,7 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
         self.adminServer.setZooKeeperServer(null);
 
         // shutdown the server itself
-        super.shutdown();
+        super.shutdownComponents();
     }
 
     @Override

+ 2 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java

@@ -46,7 +46,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
                 learner.writePacket(qp, false);
             } catch (IOException e) {
                 LOG.warn("Closing connection to leader, exception during packet send", e);
-                learner.closeSockSync();
+                learner.closeSocket();
             }
         }
     }
@@ -56,7 +56,7 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
             learner.writePacket(null, true);
         } catch (IOException e) {
             LOG.warn("Closing connection to leader, exception during packet send", e);
-            learner.closeSockSync();
+            learner.closeSocket();
         }
     }
 

+ 85 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerShutdownTest.java

@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Learner;
+import org.apache.zookeeper.server.quorum.LearnerZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class ZooKeeperServerShutdownTest extends ZKTestCase  {
+
+    static class ShutdownTrackRequestProcessor implements RequestProcessor {
+        boolean shutdown = false;
+
+        @Override
+        public void processRequest(Request request) throws RequestProcessorException {
+        }
+
+        @Override
+        public void shutdown() {
+            shutdown = true;
+        }
+    }
+
+    public static class ShutdownTrackLearnerZooKeeperServer extends LearnerZooKeeperServer {
+        public ShutdownTrackLearnerZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self) throws IOException {
+            super(logFactory, 2000, 2000, 2000, -1, new ZKDatabase(logFactory), self);
+        }
+
+        @Override
+        protected void setupRequestProcessors() {
+            firstProcessor = new ShutdownTrackRequestProcessor();
+            syncProcessor = new SyncRequestProcessor(this, null);
+            syncProcessor.start();
+        }
+
+        ShutdownTrackRequestProcessor getFirstProcessor() {
+            return (ShutdownTrackRequestProcessor) firstProcessor;
+        }
+
+        SyncRequestProcessor getSyncRequestProcessor() {
+            return syncProcessor;
+        }
+
+        @Override
+        public Learner getLearner() {
+            return null;
+        }
+    }
+
+    @Test
+    void testLearnerZooKeeperServerShutdown(@TempDir File tmpDir) throws Exception {
+        File tmpFile = File.createTempFile("test", ".dir", tmpDir);
+        tmpFile.delete();
+        FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpFile, tmpFile);
+        ShutdownTrackLearnerZooKeeperServer zooKeeperServer = new ShutdownTrackLearnerZooKeeperServer(logFactory, new QuorumPeer());
+        zooKeeperServer.startup();
+        zooKeeperServer.shutdown(false);
+        assertTrue(zooKeeperServer.getFirstProcessor().shutdown);
+        assertFalse(zooKeeperServer.getSyncRequestProcessor().isAlive());
+    }
+}

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java

@@ -97,7 +97,7 @@ public class WatchLeakTest {
             }
         });
 
-        ZKDatabase database = new ZKDatabase(null);
+        ZKDatabase database = new ZKDatabase(mock(FileTxnSnapLog.class));
         database.setlastProcessedZxid(2L);
         QuorumPeer quorumPeer = mock(QuorumPeer.class);
         FileTxnSnapLog logfactory = mock(FileTxnSnapLog.class);