Kaynağa Gözat

ZOOKEEPER-3829: fix rolling restart when dynamic reconfig is disabled

In four different Jira tickets (ZOOKEEPER-3829, ZOOKEEPER-3830, ZOOKEEPER-3814, ZOOKEEPER-3842) we saw different errors when dynamic reconfig was disabled and we used rolling restart to change the quorum membership configuration. These rolling restart sequences was working fine on 3.4 but caused errors in 3.5 or 3.6.

In worst case the rolling restart leads to the scenario that we had an elected leader which was up but unable to commit any changes. This happens, when the quorum is extended with a new member in the following sequence:
* start server.1, server.2, server.3 (with config: 1,2,3)
* start server.4 (with config 1,2,3,4)
* stop server.1, then restart it with config 1,2,3,4
* stop server.2, then restart it with config 1,2,3,4
* stop server.3, then restart it with config 1,2,3,4
* at this point leader is server.4, but it can not commit any transaction

An other error we saw was when we changed a host name of an existing member (removing server.5 and add a new host as server.6). In this case we found in the logs of the new server (server.6) that it is still tried to connect to the old invalid server (server.5), although it was missing from it's config. The same problem remained even after making a full rolling-restart on all the nodes.

In this patch I try to fix these issues without breaking anything. The patch contains the following changes:
* We are making sure that neither the committed, nor the last seen config gets updated if dynamic reconfig is disabled.
* It is not possible now to start the leader without the ability of committing transaction, when dynamic reconfig is disabled (this is only needed to avoid a reconfig edge-case).
* I added a testcase simulating the enablement of dynamic reconfig using rolling restart
* I added a few more unit tests to cover rolling restart scenarios. (the tests are failing without the patch but succeeding after applying it).
* The enablement / disablement of reconfig is getting initialized now in the QuorumPeer and gets propagated to the other classes. This was needed for the rolling restart tests to be able to enable/disable reconfig only for the newly created servers without affecting the servers running already in the same JVM.

I also tested the changes with docker, using: https://github.com/symat/zookeeper-docker-test

target branches: 3.5, 3.6, master

Author: Mate Szalay-Beko <symat@apache.org>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Norbert Kalmar <nkalmar@apache.org>

Closes #1356 from symat/ZOOKEEPER-3829
Mate Szalay-Beko 5 yıl önce
ebeveyn
işleme
e91455c1e3

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -133,6 +133,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
     }
     @Override
     public void run() {
+        LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
         try {
             while (true) {
                 ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
@@ -405,7 +406,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             addChangeRecord(nodeRecord);
             break;
         case OpCode.reconfig:
-            if (!QuorumPeerConfig.isReconfigEnabled()) {
+            if (!zks.isReconfigEnabled()) {
                 LOG.error("Reconfig operation requested but reconfig feature is disabled.");
                 throw new KeeperException.ReconfigDisabledException();
             }

+ 12 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -74,6 +74,7 @@ import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
 import org.apache.zookeeper.server.util.JvmPauseMonitor;
 import org.apache.zookeeper.server.util.OSMXBean;
@@ -177,6 +178,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     private boolean isResponseCachingEnabled = true;
     /* contains the configuration file content read at startup */
     protected String initialConfig;
+    protected boolean reconfigEnabled;
     private final RequestPathMetricsCollector requestPathMetricsCollector;
 
     private boolean localSessionEnabled = false;
@@ -303,7 +305,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * actually start listening for clients until run() is invoked.
      *
      */
-    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
         serverStats = new ServerStats(this);
         this.txnLogFactory = txnLogFactory;
         this.txnLogFactory.setServerStats(this.serverStats);
@@ -312,6 +314,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         setMinSessionTimeout(minSessionTimeout);
         setMaxSessionTimeout(maxSessionTimeout);
         this.listenBacklog = clientPortListenBacklog;
+        this.reconfigEnabled = reconfigEnabled;
 
         listener = new ZooKeeperServerListenerImpl(this);
 
@@ -355,7 +358,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      *
      */
     public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
-        this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig);
+        this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
         this.jvmPauseMonitor = jvmPauseMonitor;
         if (jvmPauseMonitor != null) {
             LOG.info("Added JvmPauseMonitor to server");
@@ -368,8 +371,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * @param tickTime the ticktime for the server
      * @throws IOException
      */
-    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) throws IOException {
-        this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig);
+    public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) {
+        this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
     }
 
     public ServerStats serverStats() {
@@ -440,7 +443,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * @throws IOException
      */
     public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException {
-        this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "");
+        this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled());
     }
 
     /**
@@ -2114,4 +2117,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             return 0;
         }
     }
+
+    public boolean isReconfigEnabled() {
+        return this.reconfigEnabled;
+    }
 }

+ 20 - 12
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java

@@ -635,6 +635,7 @@ public class Leader extends LearnerMaster {
                 // hence before they construct the NEWLEADER message containing
                 // the last-seen-quorumverifier of the leader, which we change below
                 try {
+                    LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
                     QuorumVerifier newQV = self.configFromString(curQV.toString());
                     newQV.setVersion(zk.getZxid());
                     self.setLastSeenQuorumVerifier(newQV, true);
@@ -943,6 +944,8 @@ public class Leader extends LearnerMaster {
             self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
 
             if (designatedLeader != self.getId()) {
+                LOG.info(String.format("Committing a reconfiguration (reconfigEnabled=%s); this leader is not the designated "
+                        + "leader anymore, setting allowedToCommit=false", self.isReconfigEnabled()));
                 allowedToCommit = false;
             }
 
@@ -1508,20 +1511,25 @@ public class Leader extends LearnerMaster {
                  newLeaderProposal.ackSetsToString(),
                  Long.toHexString(zk.getZxid()));
 
-        /*
-         * ZOOKEEPER-1324. the leader sends the new config it must complete
-         *  to others inside a NEWLEADER message (see LearnerHandler where
-         *  the NEWLEADER message is constructed), and once it has enough
-         *  acks we must execute the following code so that it applies the
-         *  config to itself.
-         */
-        QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
+        if (self.isReconfigEnabled()) {
+            /*
+             * ZOOKEEPER-1324. the leader sends the new config it must complete
+             *  to others inside a NEWLEADER message (see LearnerHandler where
+             *  the NEWLEADER message is constructed), and once it has enough
+             *  acks we must execute the following code so that it applies the
+             *  config to itself.
+             */
+            QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
 
-        Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());
+            Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());
 
-        self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
-        if (designatedLeader != self.getId()) {
-            allowedToCommit = false;
+            self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
+            if (designatedLeader != self.getId()) {
+                LOG.warn("This leader is not the designated leader, it will be initialized with allowedToCommit = false");
+                allowedToCommit = false;
+            }
+        } else {
+            LOG.info("Dynamic reconfig feature is disabled, skip designatedLeader calculation and reconfig processing.");
         }
 
         leaderStartTime = Time.currentElapsedTime();

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -571,7 +571,7 @@ public class Learner {
                 // ZOOKEEPER-2819: overwrite config node content extracted
                 // from leader snapshot with local config, to avoid potential
                 // inconsistency of config node content during rolling restart.
-                if (!QuorumPeerConfig.isReconfigEnabled()) {
+                if (!self.isReconfigEnabled()) {
                     LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                     zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                 }

+ 4 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -1476,4 +1476,8 @@ public class QuorumCnxManager {
         return senderWorkerMap.get(peerSid) != null;
     }
 
+    public boolean isReconfigEnabled() {
+        return self.isReconfigEnabled();
+    }
+
 }

+ 13 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -1003,6 +1003,8 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
 
     AdminServer adminServer;
 
+    private final boolean reconfigEnabled;
+
     public static QuorumPeer testingQuorumPeer() throws SaslException {
         return new QuorumPeer();
     }
@@ -1014,6 +1016,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         adminServer = AdminServerFactory.createAdminServer();
         x509Util = createX509Util();
         initialize();
+        reconfigEnabled = QuorumPeerConfig.isReconfigEnabled();
     }
 
     // VisibleForTesting
@@ -1804,6 +1807,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     }
 
     public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
+        if (!isReconfigEnabled()) {
+            LOG.info("Dynamic reconfig is disabled, we don't store the last seen config.");
+            return;
+        }
+
         // If qcm is non-null, we may call qcm.connectOne(), which will take the lock on qcm
         // and then take QV_LOCK.  Take the locks in the same order to ensure that we don't
         // deadlock against other callers of connectOne().  If qcmRef gets set in another
@@ -2153,7 +2161,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
     }
 
     public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
-        if (!QuorumPeerConfig.isReconfigEnabled()) {
+        if (!isReconfigEnabled()) {
             LOG.debug("Reconfig feature is disabled, skip reconfig processing.");
             return false;
         }
@@ -2519,6 +2527,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         return vote != null && id == vote.getId();
     }
 
+    public boolean isReconfigEnabled() {
+        return reconfigEnabled;
+    }
+
     @InterfaceAudience.Private
     /**
      * This is a metric that depends on the status of the peer.

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java

@@ -148,7 +148,7 @@ public class QuorumPeerMain {
             LOG.warn("Unable to register log4j JMX control", e);
         }
 
-        LOG.info("Starting quorum peer");
+        LOG.info("Starting quorum peer, myid=" + config.getServerId());
         MetricsProvider metricsProvider;
         try {
             metricsProvider = MetricsProviderBootstrap.startMetricsProvider(

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java

@@ -49,7 +49,8 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
 
     protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout,
                                     int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) {
-        super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self.getInitialConfig());
+        super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self.getInitialConfig(),
+              self.isReconfigEnabled());
         this.self = self;
     }
 

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java

@@ -52,7 +52,8 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
             self.maxSessionTimeout,
             self.clientPortListenBacklog,
             zkDb,
-            self.getInitialConfig());
+            self.getInitialConfig(),
+            self.isReconfigEnabled());
         this.self = self;
     }
 

+ 19 - 16
zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java

@@ -81,6 +81,9 @@ public class PrepRequestProcessorTest extends ClientBase {
     private PrepRequestProcessor processor;
     private Request outcome;
 
+    private boolean isReconfigEnabledPreviously;
+    private boolean isStandaloneEnabledPreviously;
+
     @Before
     public void setup() throws Exception {
         File tmpDir = ClientBase.createTmpDir();
@@ -93,6 +96,9 @@ public class PrepRequestProcessorTest extends ClientBase {
         servcnxnf.startup(zks);
         assertTrue("waiting for server being up ", ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
         zks.sessionTracker = new MySessionTracker();
+
+        isReconfigEnabledPreviously = QuorumPeerConfig.isReconfigEnabled();
+        isStandaloneEnabledPreviously = QuorumPeerConfig.isStandaloneEnabled();
     }
 
     @After
@@ -103,6 +109,10 @@ public class PrepRequestProcessorTest extends ClientBase {
         if (zks != null) {
             zks.shutdown();
         }
+
+        // reset the reconfig option
+        QuorumPeerConfig.setReconfigEnabled(isReconfigEnabledPreviously);
+        QuorumPeerConfig.setStandaloneEnabled(isStandaloneEnabledPreviously);
     }
 
     @Test
@@ -179,6 +189,9 @@ public class PrepRequestProcessorTest extends ClientBase {
 
     @Test
     public void testReconfigWithAnotherOutstandingChange() throws Exception {
+        QuorumPeerConfig.setReconfigEnabled(true);
+        QuorumPeerConfig.setStandaloneEnabled(false);
+
         QuorumPeer qp = new QuorumPeer();
         QuorumVerifier quorumVerifierMock = mock(QuorumVerifier.class);
         when(quorumVerifierMock.getAllMembers()).thenReturn(LeaderBeanTest.getMockedPeerViews(qp.getId()));
@@ -196,22 +209,12 @@ public class PrepRequestProcessorTest extends ClientBase {
         processor.pRequest(createRequest(record, OpCode.create, false));
         assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS));
 
-        boolean isReconfigEnabledPreviously = QuorumPeerConfig.isReconfigEnabled();
-        boolean isStandaloneEnabledPreviously = QuorumPeerConfig.isStandaloneEnabled();
-        QuorumPeerConfig.setReconfigEnabled(true);
-        QuorumPeerConfig.setStandaloneEnabled(false);
-        try {
-            String newMember = "server.0=localhost:" + PortAssignment.unique()  + ":" + PortAssignment.unique() + ":participant";
-            record = new ReconfigRequest(null, null, newMember, 0);
-            pLatch = new CountDownLatch(1);
-            processor.pRequest(createRequest(record, OpCode.reconfig, true));
-            assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS));
-            assertEquals(outcome.getHdr().getType(), OpCode.reconfig);   // Verifies that there was no error.
-        } finally {
-            // reset the reconfig option
-            QuorumPeerConfig.setReconfigEnabled(isReconfigEnabledPreviously);
-            QuorumPeerConfig.setStandaloneEnabled(isStandaloneEnabledPreviously);
-        }
+        String newMember = "server.0=localhost:" + PortAssignment.unique()  + ":" + PortAssignment.unique() + ":participant";
+        record = new ReconfigRequest(null, null, newMember, 0);
+        pLatch = new CountDownLatch(1);
+        processor.pRequest(createRequest(record, OpCode.reconfig, true));
+        assertTrue("request hasn't been processed in chain", pLatch.await(5, TimeUnit.SECONDS));
+        assertEquals(outcome.getHdr().getType(), OpCode.reconfig);   // Verifies that there was no error.
     }
 
     /**

+ 136 - 4
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigRollingRestartCompatibilityTest.java

@@ -148,11 +148,11 @@ public class ReconfigRollingRestartCompatibilityTest extends QuorumPeerTestBase
     }
 
     @Test(timeout = 90000)
-    // This test simulate the use case of change of membership through rolling
-    // restart. For a 3 node ensemble we expand it to a 5 node ensemble, verify
+    // This test simulate the use case of change of membership by starting new servers
+    // without dynamic reconfig. For a 3 node ensemble we expand it to a 5 node ensemble, verify
     // during the process each node has the expected configuration setting pushed
     // via updating local zoo.cfg file.
-    public void testRollingRestartWithMembershipChange() throws Exception {
+    public void testExtendingQuorumWithNewMembers() throws Exception {
         int serverCount = 3;
         String config = generateNewQuorumConfig(serverCount);
         QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
@@ -174,7 +174,7 @@ public class ReconfigRollingRestartCompatibilityTest extends QuorumPeerTestBase
 
         Map<Integer, String> oldServerAddress = new HashMap<>(serverAddress);
         List<String> newServers = new ArrayList<>(joiningServers);
-        config = updateExistingQuorumConfig(Arrays.asList(3, 4), new ArrayList<Integer>());
+        config = updateExistingQuorumConfig(Arrays.asList(3, 4), new ArrayList<>());
         newServers.add(serverAddress.get(3));
         newServers.add(serverAddress.get(4));
         serverCount = serverAddress.size();
@@ -209,6 +209,138 @@ public class ReconfigRollingRestartCompatibilityTest extends QuorumPeerTestBase
         }
     }
 
+    @Test
+    public void testRollingRestartWithExtendedMembershipConfig() throws Exception {
+        // in this test we are performing rolling restart with extended quorum config, see ZOOKEEPER-3829
+
+        // Start a quorum with 3 members
+        int serverCount = 3;
+        String config = generateNewQuorumConfig(serverCount);
+        QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
+        List<String> joiningServers = new ArrayList<>();
+        for (int i = 0; i < serverCount; i++) {
+            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
+            mt[i].start();
+            joiningServers.add(serverAddress.get(i));
+        }
+        for (int i = 0; i < serverCount; i++) {
+            assertTrue("waiting for server " + i + " being up", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT));
+        }
+        for (int i = 0; i < serverCount; i++) {
+            verifyQuorumConfig(i, joiningServers, null);
+            verifyQuorumMembers(mt[i]);
+        }
+
+        // Create updated config with 4 members
+        List<String> newServers = new ArrayList<>(joiningServers);
+        config = updateExistingQuorumConfig(Arrays.asList(3), new ArrayList<>());
+        newServers.add(serverAddress.get(3));
+        serverCount = serverAddress.size();
+        assertEquals("Server count should be 4 after config update.", serverCount, 4);
+
+        // We are adding one new server to the ensemble. The new server should be started with the new config
+        mt = Arrays.copyOf(mt, mt.length + 1);
+        mt[3] = new QuorumPeerTestBase.MainThread(3, clientPorts.get(3), config, false);
+        mt[3].start();
+        assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(3), CONNECTION_TIMEOUT));
+        verifyQuorumConfig(3, newServers, null);
+        verifyQuorumMembers(mt[3]);
+
+        // Now we restart the first 3 servers, one-by-one with the new config
+        for (int i = 0; i < 3; i++) {
+            mt[i].shutdown();
+
+            assertTrue(String.format("Timeout during waiting for server %d to go down", i),
+                       ClientBase.waitForServerDown("127.0.0.1:" + clientPorts.get(i), ClientBase.CONNECTION_TIMEOUT));
+
+            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
+            mt[i].start();
+            assertTrue("waiting for server " + i + " being up", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT));
+            verifyQuorumConfig(i, newServers, null);
+            verifyQuorumMembers(mt[i]);
+        }
+
+        // now verify that all nodes can handle traffic
+        for (int i = 0; i < 4; ++i) {
+            ZooKeeper zk = ClientBase.createZKClient("127.0.0.1:" + clientPorts.get(i));
+            ReconfigTest.testNormalOperation(zk, zk, false);
+        }
+
+        for (int i = 0; i < 4; ++i) {
+            mt[i].shutdown();
+        }
+    }
+
+    @Test
+    public void testRollingRestartWithHostAddedAndRemoved() throws Exception {
+        // in this test we are performing rolling restart with a new quorum config,
+        // contains a deleted node and a new node
+
+        // Start a quorum with 3 members
+        int serverCount = 3;
+        String config = generateNewQuorumConfig(serverCount);
+        QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[serverCount];
+        List<String> originalServers = new ArrayList<>();
+        for (int i = 0; i < serverCount; i++) {
+            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
+            mt[i].start();
+            originalServers.add(serverAddress.get(i));
+        }
+        for (int i = 0; i < serverCount; i++) {
+            assertTrue("waiting for server " + i + " being up", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT));
+        }
+        for (int i = 0; i < serverCount; i++) {
+            verifyQuorumConfig(i, originalServers, null);
+            verifyQuorumMembers(mt[i]);
+        }
+
+        // we are stopping the third server (myid=2)
+        mt[2].shutdown();
+        assertTrue(String.format("Timeout during waiting for server %d to go down", 2),
+                   ClientBase.waitForServerDown("127.0.0.1:" + clientPorts.get(2), ClientBase.CONNECTION_TIMEOUT));
+        String leavingServer = originalServers.get(2);
+
+        // Create updated config with the first 2 existing members, but we remove 3rd and add one with different myid
+        config = updateExistingQuorumConfig(Arrays.asList(3), Arrays.asList(2));
+        List<String> newServers = new ArrayList<>(serverAddress.values());
+        serverCount = serverAddress.size();
+        assertEquals("Server count should be 3 after config update.", serverCount, 3);
+
+
+        // We are adding one new server to the ensemble. The new server should be started with the new config
+        mt = Arrays.copyOf(mt, mt.length + 1);
+        mt[3] = new QuorumPeerTestBase.MainThread(3, clientPorts.get(3), config, false);
+        mt[3].start();
+        assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(3), CONNECTION_TIMEOUT));
+        verifyQuorumConfig(3, newServers, Arrays.asList(leavingServer));
+        verifyQuorumMembers(mt[3]);
+
+        // Now we restart the first 2 servers, one-by-one with the new config
+        for (int i = 0; i < 2; i++) {
+            mt[i].shutdown();
+
+            assertTrue(String.format("Timeout during waiting for server %d to go down", i),
+                       ClientBase.waitForServerDown("127.0.0.1:" + clientPorts.get(i), ClientBase.CONNECTION_TIMEOUT));
+
+            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts.get(i), config, false);
+            mt[i].start();
+            assertTrue("waiting for server " + i + " being up", ClientBase.waitForServerUp("127.0.0.1:" + clientPorts.get(i), CONNECTION_TIMEOUT));
+            verifyQuorumConfig(i, newServers, null);
+            verifyQuorumMembers(mt[i]);
+        }
+
+        // now verify that all three nodes can handle traffic
+        for (int i : serverAddress.keySet()) {
+            ZooKeeper zk = ClientBase.createZKClient("127.0.0.1:" + clientPorts.get(i));
+            ReconfigTest.testNormalOperation(zk, zk, false);
+        }
+
+        for (int i : serverAddress.keySet()) {
+            mt[i].shutdown();
+        }
+    }
+
+
     // Verify each quorum peer has expected config in its config zNode.
     private void verifyQuorumConfig(int sid, List<String> joiningServers, List<String> leavingServers) throws Exception {
         ZooKeeper zk = ClientBase.createZKClient("127.0.0.1:" + clientPorts.get(sid));

+ 10 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigExceptionTest.java

@@ -90,6 +90,16 @@ public class ReconfigExceptionTest extends ZKTestCase {
     @Test(timeout = 10000)
     public void testReconfigDisabled() throws InterruptedException {
         QuorumPeerConfig.setReconfigEnabled(false);
+
+        // for this test we need to restart the quorum peers to get the config change,
+        // as in the setup() we started the quorum with reconfigEnabled=true
+        qu.shutdownAll();
+        try {
+            qu.startAll();
+        } catch (IOException e) {
+            fail("Fail to start quorum servers.");
+        }
+
         try {
             reconfigPort();
             fail("Reconfig should be disabled.");

+ 65 - 11
zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java

@@ -179,22 +179,18 @@ public class ReconfigTest extends ZKTestCase implements DataCallback {
         return configStr;
     }
 
-    public static void testNormalOperation(
-        ZooKeeper writer,
-        ZooKeeper reader) throws KeeperException, InterruptedException {
-        boolean testReaderNodeExists = false;
-        boolean testWriterNodeExists = false;
+    public static void testNormalOperation(ZooKeeper writer, ZooKeeper reader) throws KeeperException, InterruptedException {
+        testNormalOperation(writer, reader, true);
+    }
 
+    public static void testNormalOperation(ZooKeeper writer, ZooKeeper reader, boolean initTestNodes) throws KeeperException, InterruptedException {
+        boolean createNodes = initTestNodes;
         for (int j = 0; j < 30; j++) {
             try {
-                if (!testWriterNodeExists) {
+                if (createNodes) {
                     createZNode(writer, "/test", "test");
-                    testWriterNodeExists = true;
-                }
-
-                if (!testReaderNodeExists) {
                     createZNode(reader, "/dummy", "dummy");
-                    testReaderNodeExists = true;
+                    createNodes = false;
                 }
 
                 String data = "test" + j;
@@ -1106,6 +1102,64 @@ public class ReconfigTest extends ZKTestCase implements DataCallback {
         assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3);
     }
 
+
+    @Test
+    public void testReconfigEnablemntWithRollingRestart() throws Exception {
+
+        // make sure dynamic reconfig is disabled
+        QuorumPeerConfig.setReconfigEnabled(false);
+
+        // start a 3 node cluster
+        qu = new QuorumUtil(1);
+        qu.disableJMXTest = true;
+        qu.startAll();
+        zkArr = createHandles(qu);
+        testNormalOperation(zkArr[1], zkArr[1], true);
+
+
+        // enable dynamic reconfig (new servers created after this time will be initialized with reconfigEnabled=true)
+        QuorumPeerConfig.setReconfigEnabled(true);
+
+        // restart the three servers, one-by-one, now with reconfig enabled
+        // test if we can write / read in the cluster after each rolling restart step
+        for (int i = 1; i < 4; i++) {
+            assertFalse("dynamic reconfig was not disabled before stopping server " + i, qu.getPeer(i).peer.isReconfigEnabled());
+            qu.shutdown(i);
+            qu.restart(i);
+            assertTrue("dynamic reconfig is not enabled for the restarted server " + i, qu.getPeer(i).peer.isReconfigEnabled());
+            testNormalOperation(zkArr[i], zkArr[(i % 3) + 1], false);
+        }
+
+        // now we will test dynamic reconfig by remove server 2, then add it back later
+        List<String> leavingServers = new ArrayList<>();
+        List<String> joiningServers = new ArrayList<>();
+        leavingServers.add("2");
+
+        // remember this server so we can add it back later
+        joiningServers.add(String.format("server.2=localhost:%d:%d:participant;localhost:%d",
+                qu.getPeer(2).peer.getQuorumAddress().getAllPorts().get(0),
+                qu.getPeer(2).peer.getElectionAddress().getAllPorts().get(0),
+                qu.getPeer(2).peer.getClientPort()));
+
+        // here we remove server 2
+        zkAdminArr = createAdminHandles(qu);
+        String configStr = reconfig(zkAdminArr[1], null, leavingServers, null, -1);
+        testServerHasConfig(zkArr[3], null, leavingServers);
+        testNormalOperation(zkArr[1], zkArr[3], false);
+
+
+        // here we add back server 2
+        QuorumVerifier qv = qu.getPeer(1).peer.configFromString(configStr);
+        long version = qv.getVersion();
+        reconfig(zkAdminArr[3], joiningServers, null, null, version);
+
+        testServerHasConfig(zkArr[1], joiningServers, null);
+        testServerHasConfig(zkArr[2], joiningServers, null);
+        testServerHasConfig(zkArr[3], joiningServers, null);
+        testNormalOperation(zkArr[3], zkArr[1], false);
+    }
+
+
     private void assertLocalPeerMXBeanAttributes(
         QuorumPeer qp,
         String beanName,