Pārlūkot izejas kodu

ZOOKEEPER-3145: Fix potential watch missing issue due to stale pzxid when replaying CloseSession txn with fuzzy snapshot

Currently, the CloseSession txn is not idempotent, executing the CloseSession twice won't get the same result, which could cause pzxid inconsistent, which in turn cause watches missing.

For more details, please check the description in ZOOKEEPER-3145.

Author: Fangmin Lyu <allenlyu@fb.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Andor Molnár <andor@apache.org>

Closes #622 from lvfangmin/ZOOKEEPER-3145
Fangmin Lyu 5 gadi atpakaļ
vecāks
revīzija
42ea26b751

+ 7 - 4
zookeeper-jute/src/main/resources/zookeeper.jute

@@ -72,7 +72,7 @@ module org.apache.zookeeper.proto {
         vector<ustring>dataWatches;
         vector<ustring>existWatches;
         vector<ustring>childWatches;
-    }        
+    }
     class RequestHeader {
         int xid;
         int type;
@@ -92,12 +92,12 @@ module org.apache.zookeeper.proto {
         long zxid;
         int err;
     }
-    
-    class GetDataRequest {       
+
+    class GetDataRequest {
         ustring path;
         boolean watch;
     }
-    
+
     class SetDataRequest {
         ustring path;
         buffer data;
@@ -323,6 +323,9 @@ module org.apache.zookeeper.txn {
     class CreateSessionTxn {
         int timeOut;
     }
+    class CloseSessionTxn {
+        vector<ustring> paths2Delete;
+    }
     class ErrorTxn {
         int err;
     }

+ 47 - 14
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -63,6 +63,7 @@ import org.apache.zookeeper.server.watch.WatchesPathReport;
 import org.apache.zookeeper.server.watch.WatchesReport;
 import org.apache.zookeeper.server.watch.WatchesSummary;
 import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.CloseSessionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
 import org.apache.zookeeper.txn.CreateTxn;
@@ -947,7 +948,14 @@ public class DataTree {
                 rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
                 break;
             case OpCode.closeSession:
-                killSession(header.getClientId(), header.getZxid());
+                long sessionId = header.getClientId();
+                if (txn != null) {
+                    killSession(sessionId, header.getZxid(),
+                            ephemerals.remove(sessionId),
+                            ((CloseSessionTxn) txn).getPaths2Delete());
+                } else {
+                    killSession(sessionId, header.getZxid());
+                }
                 break;
             case OpCode.error:
                 ErrorTxn errTxn = (ErrorTxn) txn;
@@ -1119,20 +1127,45 @@ public class DataTree {
         // so there is no need for synchronization. The list is not
         // changed here. Only create and delete change the list which
         // are again called from FinalRequestProcessor in sequence.
-        Set<String> list = ephemerals.remove(session);
-        if (list != null) {
-            for (String path : list) {
-                try {
-                    deleteNode(path, zxid);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Deleting ephemeral node " + path + " for session 0x" + Long.toHexString(session));
-                    }
-                } catch (NoNodeException e) {
-                    LOG.warn("Ignoring NoNodeException for path "
-                             + path
-                             + " while removing ephemeral for dead session 0x"
-                             + Long.toHexString(session));
+        killSession(session, zxid, ephemerals.remove(session), null);
+    }
+
+    void killSession(long session, long zxid, Set<String> paths2DeleteLocal,
+            List<String> paths2DeleteInTxn) {
+        if (paths2DeleteInTxn != null) {
+            deleteNodes(session, zxid, paths2DeleteInTxn);
+        }
+
+        if (paths2DeleteLocal == null) {
+            return;
+        }
+
+        if (paths2DeleteInTxn != null) {
+            // explicitly check and remove to avoid potential performance
+            // issue when using removeAll
+            for (String path: paths2DeleteInTxn) {
+                paths2DeleteLocal.remove(path);
+            }
+            if (!paths2DeleteLocal.isEmpty()) {
+                LOG.warn("Unexpected extra paths under session {} which "
+                        + "are not in txn 0x{}", paths2DeleteLocal,
+                        Long.toHexString(zxid));
+            }
+        }
+
+        deleteNodes(session, zxid, paths2DeleteLocal);
+    }
+
+    void deleteNodes(long session, long zxid, Iterable<String> paths2Delete) {
+        for (String path : paths2Delete) {
+            try {
+                deleteNode(path, zxid);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Deleting ephemeral node {} for session 0x{}", path, Long.toHexString(session));
                 }
+            } catch (NoNodeException e) {
+                LOG.warn("Ignoring NoNodeException for path {} while removing ephemeral for dead session 0x{}",
+                        path, Long.toHexString(session));
             }
         }
     }

+ 9 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -66,6 +66,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.CloseSessionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
@@ -532,8 +533,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             // this request is the last of the session so it should be ok
             //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             long startTime = Time.currentElapsedTime();
-            Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
             synchronized (zks.outstandingChanges) {
+                // need to move getEphemerals into zks.outstandingChanges
+                // synchronized block, otherwise there will be a race
+                // condition with the on flying deleteNode txn, and we'll
+                // delete the node again here, which is not correct
+                Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
                 for (ChangeRecord c : zks.outstandingChanges) {
                     if (c.stat == null) {
                         // Doing a delete
@@ -545,7 +550,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                 for (String path2Delete : es) {
                     addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null));
                 }
-
+                if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
+                    request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
+                }
                 zks.sessionTracker.setSessionClosing(request.sessionId);
             }
             ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);

+ 19 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -110,6 +110,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
     private static boolean digestEnabled;
 
+    // Add a enable/disable option for now, we should remove this one when
+    // this feature is confirmed to be stable
+    public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
+    private static boolean closeSessionTxnEnabled = true;
+
     static {
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
 
@@ -127,6 +132,20 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
         digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
         LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
+
+        closeSessionTxnEnabled = Boolean.parseBoolean(
+                System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
+        LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+    }
+
+    public static boolean isCloseSessionTxnEnabled() {
+        return closeSessionTxnEnabled;
+    }
+
+    public static void setCloseSessionTxnEnabled(boolean enabled) {
+        ZooKeeperServer.closeSessionTxnEnabled = enabled;
+        LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
+                ZooKeeperServer.closeSessionTxnEnabled);
     }
 
     protected ZooKeeperServerBean jmxServerBean;

+ 9 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/util/SerializeUtils.java

@@ -34,7 +34,9 @@ import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.IOUtils;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.CloseSessionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
@@ -67,7 +69,9 @@ public class SerializeUtils {
             txn = new CreateSessionTxn();
             break;
         case OpCode.closeSession:
-            return null;
+            txn = ZooKeeperServer.isCloseSessionTxnEnabled()
+                    ?  new CloseSessionTxn() : null;
+            break;
         case OpCode.create:
         case OpCode.create2:
             txn = new CreateTxn();
@@ -115,6 +119,10 @@ public class SerializeUtils {
                     create.setAcl(createv0.getAcl());
                     create.setEphemeral(createv0.getEphemeral());
                     create.setParentCVersion(-1);
+                } else if (hdr.getType() == OpCode.closeSession) {
+                    // perhaps this is before CloseSessionTxn was added,
+                    // ignore it and reset txn to null
+                    txn = null;
                 } else {
                     throw e;
                 }

+ 52 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java

@@ -45,6 +45,7 @@ import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.test.ClientBase;
@@ -103,6 +104,10 @@ public class PrepRequestProcessorTest extends ClientBase {
     }
 
     private Request createRequest(Record record, int opCode) throws IOException {
+        return createRequest(record, opCode, 1L);
+    }
+
+    private Request createRequest(Record record, int opCode, long sessionId) throws IOException {
         // encoding
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
@@ -110,7 +115,7 @@ public class PrepRequestProcessorTest extends ClientBase {
         baos.close();
         // Id
         List<Id> ids = Arrays.asList(Ids.ANYONE_ID_UNSAFE);
-        return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids);
+        return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids);
     }
 
     private void process(List<Op> ops) throws Exception {
@@ -173,6 +178,52 @@ public class PrepRequestProcessorTest extends ClientBase {
         assertNull(zks.outstandingChangesForPath.get("/foo"));
     }
 
+    /**
+     * Test ephemerals are deleted when the session is closed with
+     * the newly added CloseSessionTxn in ZOOKEEPER-3145.
+     */
+    @Test
+    public void testCloseSessionTxn() throws Exception {
+        boolean before = ZooKeeperServer.isCloseSessionTxnEnabled();
+
+        ZooKeeperServer.setCloseSessionTxnEnabled(true);
+        try {
+            // create a few ephemerals
+            long ephemeralOwner = 1;
+            DataTree dt = zks.getZKDatabase().dataTree;
+            dt.createNode("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0);
+            dt.createNode("/bar", new byte[0], Ids.OPEN_ACL_UNSAFE, ephemeralOwner, 0, 0, 0);
+
+            // close session
+            RequestHeader header = new RequestHeader();
+            header.setType(OpCode.closeSession);
+
+            final FinalRequestProcessor frq = new FinalRequestProcessor(zks);
+            final CountDownLatch latch = new CountDownLatch(1);
+            processor = new PrepRequestProcessor(zks, new RequestProcessor() {
+                @Override
+                public void processRequest(Request request) {
+                    frq.processRequest(request);
+                    latch.countDown();
+                }
+
+                @Override
+                public void shutdown() {
+                    // TODO Auto-generated method stub
+                }
+            });
+            processor.pRequest(createRequest(header, OpCode.closeSession, ephemeralOwner));
+
+            assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+            // assert ephemerals are deleted
+            assertEquals(null, dt.getNode("/foo"));
+            assertEquals(null, dt.getNode("/bar"));
+        } finally {
+            ZooKeeperServer.setCloseSessionTxnEnabled(before);
+        }
+    }
+
     /**
      * It tests that PrepRequestProcessor will return BadArgument KeeperException
      * if the request path (if it exists) is not valid, e.g. empty string.

+ 102 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CloseSessionTxnTest.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CloseSessionTxnTest extends QuorumPeerTestBase {
+
+    /**
+     * Test leader/leader compatibility with/without CloseSessionTxn, so that
+     * we can gradually rollout this code and rollback if there is problem.
+     */
+    @Test
+    public void testCloseSessionTxnCompatile() throws Exception {
+        // Test 4 cases:
+        // 1. leader disabled, follower disabled
+        testCloseSessionWithDifferentConfig(false, false);
+
+        // 2. leader disabled, follower enabled
+        testCloseSessionWithDifferentConfig(false, true);
+
+        // 3. leader enabled, follower disabled
+        testCloseSessionWithDifferentConfig(true, false);
+
+        // 4. leader enabled, follower enabled
+        testCloseSessionWithDifferentConfig(true, true);
+    }
+
+    private void testCloseSessionWithDifferentConfig(
+            boolean closeSessionEnabledOnLeader,
+            boolean closeSessionEnabledOnFollower) throws Exception {
+        // 1. set up an ensemble with 3 servers
+        final int numServers = 3;
+        servers = LaunchServers(numServers);
+        int leaderId = servers.findLeader();
+        ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnLeader);
+
+        // 2. shutdown one of the follower, start it later to pick up the
+        // CloseSessionTxnEnabled config change
+        //
+        // We cannot use different static config in the same JVM, so have to
+        // use this tricky
+        int followerA = (leaderId + 1) % numServers;
+        servers.mt[followerA].shutdown();
+        waitForOne(servers.zk[followerA], States.CONNECTING);
+
+        // 3. create an ephemeral node
+        String path = "/testCloseSessionTxnCompatile";
+        servers.zk[leaderId].create(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+
+        // 3. close the client
+        servers.restartClient(leaderId, this);
+        waitForOne(servers.zk[leaderId], States.CONNECTED);
+
+        // 4. update the CloseSessionTxnEnabled config before follower A
+        // started
+        System.setProperty("zookeeper.retainZKDatabase", "true");
+        ZooKeeperServer.setCloseSessionTxnEnabled(closeSessionEnabledOnFollower);
+
+        // 5. restart follower A
+        servers.mt[followerA].start();
+        waitForOne(servers.zk[followerA], States.CONNECTED);
+
+        // 4. verify the ephemeral node is gone
+        for (int i = 0; i < numServers; i++) {
+            final CountDownLatch syncedLatch = new CountDownLatch(1);
+            servers.zk[i].sync(path, new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    syncedLatch.countDown();
+                }
+            }, null);
+            Assert.assertTrue(syncedLatch.await(3, TimeUnit.SECONDS));
+            Assert.assertNull(servers.zk[i].exists(path, false));
+        }
+    }
+ }

+ 38 - 22
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FuzzySnapshotRelatedTest.java

@@ -125,7 +125,7 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
     public void testMultiOpConsistency() throws Exception {
         LOG.info("Create a parent node");
         final String path = "/testMultiOpConsistency";
-        createEmptyNode(zk[followerA], path);
+        createEmptyNode(zk[followerA], path, CreateMode.PERSISTENT);
 
         LOG.info("Hook to catch the 2nd sub create node txn in multi-op");
         CustomDataTree dt = (CustomDataTree) mt[followerA].main.quorumPeer.getZkDb().getDataTree();
@@ -175,8 +175,10 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
         final String parent = "/testPZxidUpdatedWhenDeletingNonExistNode";
         final String child = parent + "/child";
-        createEmptyNode(zk[leaderId], parent);
-        createEmptyNode(zk[leaderId], child);
+        createEmptyNode(zk[leaderId], parent, CreateMode.PERSISTENT);
+        createEmptyNode(zk[leaderId], child, CreateMode.EPHEMERAL);
+        // create another child to test closeSession
+        createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL);
 
         LOG.info("shutdown follower {}", followerA);
         mt[followerA].shutdown();
@@ -205,8 +207,10 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
         final String parent = "/testPZxidUpdatedDuringTakingSnapshot";
         final String child = parent + "/child";
-        createEmptyNode(zk[followerA], parent);
-        createEmptyNode(zk[followerA], child);
+        createEmptyNode(zk[followerA], parent, CreateMode.PERSISTENT);
+        createEmptyNode(zk[followerA], child, CreateMode.EPHEMERAL);
+        // create another child to test closeSession
+        createEmptyNode(zk[leaderId], child + "1", CreateMode.EPHEMERAL);
 
         LOG.info("Set up ZKDatabase to catch the node serializing in DataTree");
         addSerializeListener(followerA, parent, child);
@@ -217,8 +221,12 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
 
         LOG.info("Restarting follower A to load snapshot");
         mt[followerA].shutdown();
-        QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTING);
+        QuorumPeerMainTest.waitForOne(zk[followerA], States.CLOSED);
         mt[followerA].start();
+        // zk[followerA] will be closed in addSerializeListener, re-create it
+        zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA],
+                ClientBase.CONNECTION_TIMEOUT, this);
+
         QuorumPeerMainTest.waitForOne(zk[followerA], States.CONNECTED);
 
         LOG.info("Check and make sure the pzxid of the parent is the same " + "on leader and follower A");
@@ -226,13 +234,14 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
     }
 
     private void addSerializeListener(int sid, String parent, String child) {
-        final ZooKeeper zkClient = zk[followerA];
+        final ZooKeeper zkClient = zk[sid];
         CustomDataTree dt = (CustomDataTree) mt[sid].main.quorumPeer.getZkDb().getDataTree();
         dt.addListener(parent, new NodeSerializeListener() {
             @Override
             public void nodeSerialized(String path) {
                 try {
                     zkClient.delete(child, -1);
+                    zkClient.close();
                     LOG.info("Deleted the child node after the parent is serialized");
                 } catch (Exception e) {
                     LOG.error("Error when deleting node {}", e);
@@ -242,13 +251,26 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
     }
 
     private void compareStat(String path, int sid, int compareWithSid) throws Exception {
-        Stat stat1 = new Stat();
-        zk[sid].getData(path, null, stat1);
-
-        Stat stat2 = new Stat();
-        zk[compareWithSid].getData(path, null, stat2);
-
-        assertEquals(stat1, stat2);
+        ZooKeeper[] compareZk = new ZooKeeper[2];
+        compareZk[0] = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
+                ClientBase.CONNECTION_TIMEOUT, this);
+        compareZk[1] = new ZooKeeper("127.0.0.1:" + clientPorts[compareWithSid],
+                ClientBase.CONNECTION_TIMEOUT, this);
+        QuorumPeerMainTest.waitForAll(compareZk, States.CONNECTED);
+
+        try {
+            Stat stat1 = new Stat();
+            compareZk[0].getData(path, null, stat1);
+
+            Stat stat2 = new Stat();
+            compareZk[1].getData(path, null, stat2);
+
+            assertEquals(stat1, stat2);
+        } finally {
+            for (ZooKeeper z: compareZk) {
+                z.close();
+            }
+        }
     }
 
     @Test
@@ -286,19 +308,13 @@ public class FuzzySnapshotRelatedTest extends QuorumPeerTestBase {
         LOG.info("Make sure the global sessions are consistent with leader");
 
         Map<Long, Integer> globalSessionsOnLeader = mt[leaderId].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
-        if (mt[followerA].main.quorumPeer == null) {
-            LOG.info("quorumPeer is null");
-        }
-        if (mt[followerA].main.quorumPeer.getZkDb() == null) {
-            LOG.info("zkDb is null");
-        }
         Map<Long, Integer> globalSessionsOnFollowerA = mt[followerA].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
         LOG.info("sessions are {}, {}", globalSessionsOnLeader.keySet(), globalSessionsOnFollowerA.keySet());
         assertTrue(globalSessionsOnFollowerA.keySet().containsAll(globalSessionsOnLeader.keySet()));
     }
 
-    private void createEmptyNode(ZooKeeper zk, String path) throws Exception {
-        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    private void createEmptyNode(ZooKeeper zk, String path, CreateMode mode) throws Exception {
+        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, mode);
     }
 
     interface NodeCreateListener {