浏览代码

ZOOKEEPER-4472: Remove persistent watches individually (#2006)

* ZOOKEEPER-4472: Remove persistent watches individually

ZOOKEEPER-4466 supports different watch modes one same path, but there
are no corresponding `WatcherType`s for persistent watches. Client has
to resort to `WatcherType.Any` to remove them. This could accidently
interrupt other watches.

This PR adds `WatcherType.Persistent` and `WatcherType.PersistentRecursive`
to remove persistent watches individually.

* Assert removing WatcherType::Data from AddWatchMode::Persistent is impossible

* fixup! Assert removing WatcherType::Data from AddWatchMode::Persistent is impossible
Kezhu Wang 1 年之前
父节点
当前提交
e8b2fbeb10

+ 6 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java

@@ -191,6 +191,8 @@ public interface Watcher {
     enum WatcherType {
         Children(1),
         Data(2),
+        Persistent(4),
+        PersistentRecursive(5),
         Any(3);
 
         // Integer representation of value
@@ -212,7 +214,10 @@ public interface Watcher {
                 return WatcherType.Data;
             case 3:
                 return WatcherType.Any;
-
+            case 4:
+                return Persistent;
+            case 5:
+                return PersistentRecursive;
             default:
                 throw new RuntimeException("Invalid integer value for conversion to WatcherType");
             }

+ 20 - 20
zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java

@@ -153,6 +153,18 @@ class ZKWatchManager implements ClientWatchManager {
             }
             break;
         }
+        case Persistent: {
+            synchronized (persistentWatches) {
+                removedWatcher = removeWatches(persistentWatches, watcher, clientPath, local, rc, persistentWatchersToRem);
+            }
+            break;
+        }
+        case PersistentRecursive: {
+            synchronized (persistentRecursiveWatches) {
+                removedWatcher = removeWatches(persistentRecursiveWatches, watcher, clientPath, local, rc, persistentWatchersToRem);
+            }
+            break;
+        }
         case Any: {
             synchronized (childWatches) {
                 removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem);
@@ -225,18 +237,6 @@ class ZKWatchManager implements ClientWatchManager {
             synchronized (childWatches) {
                 containsWatcher = contains(path, watcher, childWatches);
             }
-
-            synchronized (persistentWatches) {
-                boolean contains_temp = contains(path, watcher,
-                        persistentWatches);
-                containsWatcher |= contains_temp;
-            }
-
-            synchronized (persistentRecursiveWatches) {
-                boolean contains_temp = contains(path, watcher,
-                        persistentRecursiveWatches);
-                containsWatcher |= contains_temp;
-            }
             break;
         }
         case Data: {
@@ -248,17 +248,17 @@ class ZKWatchManager implements ClientWatchManager {
                 boolean contains_temp = contains(path, watcher, existWatches);
                 containsWatcher |= contains_temp;
             }
-
+            break;
+        }
+        case Persistent: {
             synchronized (persistentWatches) {
-                boolean contains_temp = contains(path, watcher,
-                        persistentWatches);
-                containsWatcher |= contains_temp;
+                containsWatcher |= contains(path, watcher, persistentWatches);
             }
-
+            break;
+        }
+        case PersistentRecursive: {
             synchronized (persistentRecursiveWatches) {
-                boolean contains_temp = contains(path, watcher,
-                        persistentRecursiveWatches);
-                containsWatcher |= contains_temp;
+                containsWatcher |= contains(path, watcher, persistentRecursiveWatches);
             }
             break;
         }

+ 6 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java

@@ -37,6 +37,8 @@ public class RemoveWatchesCommand extends CliCommand {
     static {
         options.addOption("c", false, "child watcher type");
         options.addOption("d", false, "data watcher type");
+        options.addOption("p", false, "persistent watcher type");
+        options.addOption("r", false, "persistent recursive watcher type");
         options.addOption("a", false, "any watcher type");
         options.addOption("l", false, "remove locally when there is no server connection");
     }
@@ -70,6 +72,10 @@ public class RemoveWatchesCommand extends CliCommand {
             wtype = WatcherType.Children;
         } else if (cl.hasOption("d")) {
             wtype = WatcherType.Data;
+        } else if (cl.hasOption("p")) {
+            wtype = WatcherType.Persistent;
+        } else if (cl.hasOption("r")) {
+            wtype = WatcherType.PersistentRecursive;
         } else if (cl.hasOption("a")) {
             wtype = WatcherType.Any;
         }

+ 18 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -1563,11 +1563,16 @@ public class DataTree {
         case Data:
             containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.STANDARD);
             break;
+        case Persistent:
+            containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.PERSISTENT);
+            break;
+        case PersistentRecursive:
+            containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
+            break;
         case Any:
             if (this.childWatches.containsWatcher(path, watcher, null)) {
                 containsWatcher = true;
-            }
-            if (this.dataWatches.containsWatcher(path, watcher, null)) {
+            } else if (this.dataWatches.containsWatcher(path, watcher, null)) {
                 containsWatcher = true;
             }
             break;
@@ -1584,6 +1589,17 @@ public class DataTree {
         case Data:
             removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.STANDARD);
             break;
+        case Persistent:
+            if (this.childWatches.removeWatcher(path, watcher, WatcherMode.PERSISTENT)) {
+                removed = true;
+            }
+            if (this.dataWatches.removeWatcher(path, watcher, WatcherMode.PERSISTENT)) {
+                removed = true;
+            }
+            break;
+        case PersistentRecursive:
+            removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
+            break;
         case Any:
             if (this.childWatches.removeWatcher(path, watcher, null)) {
                 removed = true;

+ 431 - 198
zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java

@@ -30,6 +30,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +46,7 @@ import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.jupiter.api.AfterEach;
@@ -97,9 +99,16 @@ public class RemoveWatchesTest extends ClientBase {
             MyCallback c1 = new MyCallback(rc.intValue(), path);
             zk.removeWatches(path, watcher, watcherType, local, c1, null);
             assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
-            if (KeeperException.Code.OK.intValue() != c1.rc) {
-                KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc));
-                throw ke;
+            if (rc.intValue() != c1.rc) {
+                throw KeeperException.create(KeeperException.Code.get(c1.rc));
+            }
+        } else if (rc != Code.OK) {
+            try {
+                zk.removeWatches(path, watcher, watcherType, local);
+                fail("expect exception code " + rc);
+            } catch (KeeperException ex) {
+                assertEquals(rc, ex.code());
+                assertEquals(path, ex.getPath());
             }
         } else {
             zk.removeWatches(path, watcher, watcherType, local);
@@ -118,15 +127,50 @@ public class RemoveWatchesTest extends ClientBase {
             MyCallback c1 = new MyCallback(rc.intValue(), path);
             zk.removeAllWatches(path, watcherType, local, c1, null);
             assertTrue(c1.matches(), "Didn't succeeds removeWatch operation");
-            if (KeeperException.Code.OK.intValue() != c1.rc) {
-                KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc));
-                throw ke;
+            if (rc.intValue() != c1.rc) {
+                throw KeeperException.create(KeeperException.Code.get(c1.rc));
+            }
+        } else if (rc != Code.OK) {
+            try {
+                zk.removeAllWatches(path, watcherType, local);
+                fail("expect exception code " + rc);
+            } catch (KeeperException ex) {
+                assertEquals(rc, ex.code());
+                assertEquals(path, ex.getPath());
             }
         } else {
             zk.removeAllWatches(path, watcherType, local);
         }
     }
 
+    private void assertWatchers(ZooKeeper zk, String path, WatcherType... watcherTypes) {
+        for (WatcherType watcherType : watcherTypes) {
+            String msg = String.format("expect watcher for path %s and type %s", path, watcherType);
+            assertTrue(isServerSessionWatcher(zk.getSessionId(), path, watcherType), msg);
+        }
+    }
+
+    private void assertNoWatchers(ZooKeeper zk, String path, WatcherType... watcherTypes) {
+        for (WatcherType watcherType : watcherTypes) {
+            String msg = String.format("expect no watcher for path %s and type %s", path, watcherType);
+            assertFalse(isServerSessionWatcher(zk.getSessionId(), path, watcherType), msg);
+        }
+    }
+
+    private void assertWatchersExcept(ZooKeeper zk, String path, WatcherType... watcherTypes) {
+        List<WatcherType> excludes = Arrays.asList(watcherTypes);
+        for (WatcherType watcherType : WatcherType.values()) {
+            if (watcherType == WatcherType.Any) {
+                continue;
+            }
+            if (excludes.contains(watcherType)) {
+                assertNoWatchers(zk, path, watcherType);
+            } else {
+                assertWatchers(zk, path, watcherType);
+            }
+        }
+    }
+
     /**
      * Test verifies removal of single watcher when there is server connection
      */
@@ -338,6 +382,96 @@ public class RemoveWatchesTest extends ClientBase {
         assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event");
     }
 
+    /**
+     * Test verifies removing all watcher with WatcherType.Persistent.
+     *
+     * <p>All other watchers shouldn't be removed.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemoveAllPersistentWatchers(boolean useAsync) throws InterruptedException, KeeperException {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();
+
+        Watcher persistentWatcher1 = persistentEvents1::add;
+        Watcher persistentWatcher2 = persistentEvents2::add;
+        zk2.addWatch("/node1", persistentWatcher1, AddWatchMode.PERSISTENT);
+        zk2.addWatch("/node1", persistentWatcher2, AddWatchMode.PERSISTENT);
+
+        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
+        zk2.getData("/node1", dataEvents::add, null);
+        zk2.getChildren("/node1", childrenEvents::add);
+        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);
+
+        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Persistent, false, Code.OK, useAsync);
+        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Persistent, false, Code.OK, useAsync);
+        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Data, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Data, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Children, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Children, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync);
+
+        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1", null, -1);
+
+        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/node2");
+        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
+    }
+
+    /**
+     * Test verifies removing all watcher with WatcherType.PersistentRecursive.
+     *
+     * <p>All other watchers shouldn't be removed
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemoveAllPersistentRecursiveWatchers(boolean useAsync) throws InterruptedException, KeeperException {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();
+
+        Watcher recursiveWatcher1 = recursiveEvents1::add;
+        Watcher recursiveWatcher2 = recursiveEvents2::add;
+        zk2.addWatch("/node1", recursiveWatcher1, AddWatchMode.PERSISTENT_RECURSIVE);
+        zk2.addWatch("/node1", recursiveWatcher2, AddWatchMode.PERSISTENT_RECURSIVE);
+
+        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
+        zk2.getData("/node1", dataEvents::add, null);
+        zk2.getChildren("/node1", childrenEvents::add);
+        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+
+        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.PersistentRecursive, false, Code.OK, useAsync);
+        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.PersistentRecursive, false, Code.OK, useAsync);
+        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Data, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Data, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Children, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Children, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Persistent, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Persistent, false, Code.NOWATCHER, useAsync);
+
+        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1");
+
+        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1", "test".getBytes(), -1);
+
+        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+    }
     /**
      * Test verifies given watcher doesn't exists!
      */
@@ -360,30 +494,10 @@ public class RemoveWatchesTest extends ClientBase {
         // New Watcher which will be used for removal
         MyWatcher w3 = new MyWatcher("/node1", 2);
 
-        try {
-            removeWatches(zk2, "/node1", w3, WatcherType.Any, false, Code.NOWATCHER, useAsync);
-            fail("Should throw exception as given watcher doesn't exists");
-        } catch (KeeperException.NoWatcherException nwe) {
-            // expected
-        }
-        try {
-            removeWatches(zk2, "/node1", w3, WatcherType.Children, false, Code.NOWATCHER, useAsync);
-            fail("Should throw exception as given watcher doesn't exists");
-        } catch (KeeperException.NoWatcherException nwe) {
-            // expected
-        }
-        try {
-            removeWatches(zk2, "/node1", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
-            fail("Should throw exception as given watcher doesn't exists");
-        } catch (KeeperException.NoWatcherException nwe) {
-            // expected
-        }
-        try {
-            removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
-            fail("Should throw exception as given watcher doesn't exists");
-        } catch (KeeperException.NoWatcherException nwe) {
-            // expected
-        }
+        removeWatches(zk2, "/node1", w3, WatcherType.Any, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", w3, WatcherType.Children, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/node1", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
+        removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync);
     }
 
     /**
@@ -461,12 +575,7 @@ public class RemoveWatchesTest extends ClientBase {
         removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK, useAsync);
         assertTrue(w2.matches(), "Didn't remove child watcher");
         assertFalse(w1.matches(), "Shouldn't remove data watcher");
-        try {
-            removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS, useAsync);
-            fail("Should throw exception as last watch removal requires server connection");
-        } catch (KeeperException.ConnectionLossException nwe) {
-            // expected
-        }
+        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS, useAsync);
         assertFalse(w1.matches(), "Shouldn't remove data watcher");
 
         // when local=true, here if connection not available, simply removes
@@ -682,25 +791,17 @@ public class RemoveWatchesTest extends ClientBase {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     @Timeout(value = 90)
-    public void testNoWatcherServerException(boolean useAsync) throws InterruptedException, IOException, TimeoutException {
+    public void testNoWatcherServerException(boolean useAsync) throws KeeperException, InterruptedException, IOException, TimeoutException {
         CountdownWatcher watcher = new CountdownWatcher();
         ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher));
         MyWatchManager watchManager = new MyWatchManager(false, watcher);
         doReturn(watchManager).when(zk).getWatchManager();
-        boolean nw = false;
 
         watcher.waitForConnected(CONNECTION_TIMEOUT);
 
-        try {
-            zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false);
-        } catch (KeeperException nwe) {
-            if (nwe.code().intValue() == Code.NOWATCHER.intValue()) {
-                nw = true;
-            }
-        }
+        removeWatches(zk, "/nowatchhere", watcher, WatcherType.Data, false, Code.NOWATCHER, useAsync);
 
         assertThat("Server didn't return NOWATCHER", watchManager.lastReturnCode, is(Code.NOWATCHER.intValue()));
-        assertThat("NoWatcherException didn't happen", nw, is(true));
     }
 
     /**
@@ -711,12 +812,7 @@ public class RemoveWatchesTest extends ClientBase {
     @Timeout(value = 90)
     public void testRemoveAllNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException {
         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        try {
-            removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.NOWATCHER, useAsync);
-            fail("Should throw exception as given watcher doesn't exists");
-        } catch (KeeperException.NoWatcherException nwe) {
-            // expected
-        }
+        removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.NOWATCHER, useAsync);
     }
 
     /**
@@ -810,213 +906,342 @@ public class RemoveWatchesTest extends ClientBase {
     }
 
     /**
-     * Test verifies WatcherType.Data - removes only the configured data watcher
-     * function
+     * Test verifies {@link WatcherType#Persistent} - removes only the configured watcher function
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemoveWhenMultiplePersistentWatchesOnAPath(boolean useAsync) throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();
+        Watcher w1 = persistentEvents1::add;
+        // Add multiple persistent watches
+        zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT);
+        zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT);
+
+        removeWatches(zk2, "/node1", w1, WatcherType.Persistent, false, Code.OK, useAsync);
+        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");
+
+        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        assertEvent(persistentEvents2, EventType.NodeChildrenChanged, "/node1");
+        assertNoEvent(persistentEvents1);
+    }
+
+    /**
+     * Test verifies {@link WatcherType#PersistentRecursive} - removes only the configured watcher function
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemoveWhenMultiplePersistentRecursiveWatchesOnAPath(boolean useAsync) throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();
+        Watcher w1 = recursiveEvents1::add;
+        // Add multiple persistent recursive watches
+        zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT_RECURSIVE);
+        zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE);
+
+        removeWatches(zk2, "/node1", w1, WatcherType.PersistentRecursive, false, Code.OK, useAsync);
+        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");
+
+        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        assertEvent(recursiveEvents2, EventType.NodeCreated, "/node1/node2");
+        assertNoEvent(recursiveEvents1);
+    }
+
+    /**
+     * Test verifies {@link OpCode#checkWatches} {@link WatcherType#Persistent} using {@link WatcherType#Data}.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemovePersistentWatchesOnAPathPartially(boolean useAsync) throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
+        Watcher persistentWatcher = persistentEvents::add;
+        zk2.addWatch("/node1", persistentWatcher, AddWatchMode.PERSISTENT);
+
+        assertWatchers(zk2, "/node1", WatcherType.Persistent);
+        assertNoWatchers(zk2, "/node1", WatcherType.Data);
+        removeWatches(zk2, "/node1", persistentWatcher, WatcherType.Data, false, Code.NOWATCHER, useAsync);
+        assertWatchers(zk2, "/node1", WatcherType.Persistent);
+        assertNoWatchers(zk2, "/node1", WatcherType.Data);
+
+        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1", null, -1);
+
+        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+
+        assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Data}.
+     *
+     * <p>All other watcher types shouldn't be removed.
      */
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     @Timeout(value = 90)
     public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception {
         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        final CountDownLatch dWatchCount = new CountDownLatch(2);
-        final CountDownLatch rmWatchCount = new CountDownLatch(2);
-        Watcher w1 = event -> {
-            switch (event.getType()) {
-            case DataWatchRemoved:
-                rmWatchCount.countDown();
-                break;
-            case NodeDataChanged:
-                dWatchCount.countDown();
-                break;
-            default:
-                break;
-            }
-        };
-        Watcher w2 = event -> {
-            switch (event.getType()) {
-            case DataWatchRemoved:
-                rmWatchCount.countDown();
-                break;
-            case NodeDataChanged:
-                dWatchCount.countDown();
-                break;
-            default:
-                break;
-            }
-        };
+
+        BlockingDeque<WatchedEvent> dataEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> dataEvents2 = new LinkedBlockingDeque<>();
         // Add multiple data watches
-        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
-        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
-        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
-        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
+        zk2.getData("/node1", dataEvents1::add, null);
+        zk2.getData("/node1", dataEvents2::add, null);
 
+        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
         BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
         BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
+        zk2.getChildren("/node1", childrenEvents::add);
         zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
         zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);
 
-        assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
+        assertWatchers(zk2, "/node1", WatcherType.values());
         removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync);
-        assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
-
-        assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
+        assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1");
+        assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1");
+        assertWatchersExcept(zk2, "/node1", WatcherType.Data);
 
         zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk1.setData("/node1/child", new byte[0], -1);
-        zk1.delete("/node1/child", -1);
-        zk1.setData("/node1", new byte[0], -1);
-        zk1.delete("/node1", -1);
+        zk1.setData("/node1", null, -1);
+
+        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
 
-        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
         assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
         assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
-        assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");
 
         assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
-        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1/child");
-        assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1/child");
         assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
-        assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1");
 
-        assertEquals(2, dWatchCount.getCount(), "Received watch notification after removal!");
+        assertNoEvent(dataEvents1);
+        assertNoEvent(dataEvents2);
     }
 
     /**
-     * Test verifies WatcherType.Children - removes only the configured child
-     * watcher function
+     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Children}.
+     *
+     * <p>All other watcher types shouldn't be removed.
      */
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     @Timeout(value = 90)
     public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception {
         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        final CountDownLatch cWatchCount = new CountDownLatch(2);
-        final CountDownLatch rmWatchCount = new CountDownLatch(2);
-        Watcher w1 = event -> {
-            switch (event.getType()) {
-            case ChildWatchRemoved:
-                rmWatchCount.countDown();
-                break;
-            case NodeChildrenChanged:
-                cWatchCount.countDown();
-                break;
-            default:
-                break;
-            }
-        };
-        Watcher w2 = event -> {
-            switch (event.getType()) {
-            case ChildWatchRemoved:
-                rmWatchCount.countDown();
-                break;
-            case NodeChildrenChanged:
-                cWatchCount.countDown();
-                break;
-            default:
-                break;
-            }
-        };
+
+        BlockingDeque<WatchedEvent> childrenEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> childrenEvents2 = new LinkedBlockingDeque<>();
         // Add multiple child watches
-        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
-        assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches");
-        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
-        assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");
+        zk2.getChildren("/node1", childrenEvents1::add);
+        zk2.getChildren("/node1", childrenEvents2::add);
 
+        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
         BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
+        zk2.getData("/node1", dataEvents::add, null);
         zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);
 
-        assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
+        assertWatchers(zk2, "/node1", WatcherType.values());
         removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync);
-        assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher");
-
-        assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");
+        assertEvent(childrenEvents1, EventType.ChildWatchRemoved, "/node1");
+        assertEvent(childrenEvents2, EventType.ChildWatchRemoved, "/node1");
+        assertWatchersExcept(zk2, "/node1", WatcherType.Children);
 
         zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk1.setData("/node1/child", new byte[0], -1);
-        zk1.delete("/node1/child", -1);
-        zk1.setData("/node1", new byte[0], -1);
-        zk1.delete("/node1", -1);
+        zk1.setData("/node1", null, -1);
 
+        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
         assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
+        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
+
+        assertNull(childrenEvents1.poll(10, TimeUnit.MILLISECONDS));
+        assertNull(childrenEvents2.poll(10, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Persistent}.
+     *
+     * <p>All other watcher types shouldn't be removed.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemoveAllPersistentWatchesOnAPath(boolean useAsync) throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();
+        // Add multiple persistent watches
+        zk2.addWatch("/node1", persistentEvents1::add, AddWatchMode.PERSISTENT);
+        zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT);
+
+        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
+        zk2.getData("/node1", dataEvents::add, null);
+        zk2.getChildren("/node1", childrenEvents::add, null);
+        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);
+
+        assertWatchers(zk2, "/node1", WatcherType.values());
+        removeAllWatches(zk2, "/node1", WatcherType.Persistent, false, Code.OK, useAsync);
+        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1");
+        assertWatchersExcept(zk2, "/node1", WatcherType.Persistent);
+
+        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1", null, -1);
+
+        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child1");
+        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
+
+        assertNull(persistentEvents1.poll(10, TimeUnit.MILLISECONDS));
+        assertNull(persistentEvents2.poll(10, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Persistent} using {@link WatcherType#Data}.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemoveAllPersistentWatchesOnAPathPartially(boolean useAsync) throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
+        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+
+        assertWatchers(zk2, "/node1", WatcherType.Persistent);
+        assertNoWatchers(zk2, "/node1", WatcherType.Data);
+        removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.NOWATCHER, useAsync);
+        assertWatchers(zk2, "/node1", WatcherType.Persistent);
+        assertNoWatchers(zk2, "/node1", WatcherType.Data);
+
+        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1", null, -1);
+
         assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
         assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
-        assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");
 
-        assertEquals(2, cWatchCount.getCount(), "Received watch notification after removal!");
+        assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
     }
 
     /**
-     * Test verifies WatcherType.Any - removes all the configured child,data
-     * watcher functions
+     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#PersistentRecursive}.
+     *
+     * <p>All other watcher types shouldn't be removed.
      */
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     @Timeout(value = 90)
-    public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception {
+    public void testRemoveAllPersistentRecursiveWatchesOnAPath(boolean useAsync) throws Exception {
         zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        final CountDownLatch watchCount = new CountDownLatch(2);
-        final CountDownLatch rmWatchCount = new CountDownLatch(4);
-        Watcher w1 = event -> {
-            switch (event.getType()) {
-            case ChildWatchRemoved:
-            case DataWatchRemoved:
-                rmWatchCount.countDown();
-                break;
-            case NodeChildrenChanged:
-            case NodeDataChanged:
-                watchCount.countDown();
-                break;
-            default:
-                break;
-            }
-        };
-        Watcher w2 = event -> {
-            switch (event.getType()) {
-            case ChildWatchRemoved:
-            case DataWatchRemoved:
-                rmWatchCount.countDown();
-                break;
-            case NodeChildrenChanged:
-            case NodeDataChanged:
-                watchCount.countDown();
-                break;
-            default:
-                break;
-            }
-        };
-        // Add multiple child watches
-        LOG.info("Adding child watcher {} on path {}", w1, "/node1");
-        assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches");
-        LOG.info("Adding child watcher {} on path {}", w2, "/node1");
-        assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");
 
-        // Add multiple data watches
-        LOG.info("Adding data watcher {} on path {}", w1, "/node1");
-        assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches");
-        LOG.info("Adding data watcher {} on path {}", w2, "/node1");
-        assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
+        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();
+        // Add multiple persistent recursive watches
+        zk2.addWatch("/node1", recursiveEvents1::add, AddWatchMode.PERSISTENT_RECURSIVE);
+        zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE);
 
+        BlockingDeque<WatchedEvent> dataEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> childrenEvents = new LinkedBlockingDeque<>();
         BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
-        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
+        zk2.getData("/node1", dataEvents::add, null);
+        zk2.getChildren("/node1", childrenEvents::add, null);
         zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
-        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);
 
-        assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is not a watcher");
-        assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
-        assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
+        assertWatchers(zk2, "/node1", WatcherType.values());
+        removeAllWatches(zk2, "/node1", WatcherType.PersistentRecursive, false, Code.OK, useAsync);
+        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1");
+        assertWatchersExcept(zk2, "/node1", WatcherType.PersistentRecursive);
+
+        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1", null, -1);
+
+        assertEvent(dataEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+
+        assertNull(recursiveEvents1.poll(10, TimeUnit.MILLISECONDS));
+        assertNull(recursiveEvents2.poll(10, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Any}.
+     *
+     * <p>All watcher types should be removed.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @Timeout(value = 90)
+    public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // Add multiple child watches
+        BlockingDeque<WatchedEvent> childEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> childEvents2 = new LinkedBlockingDeque<>();
+        zk2.getChildren("/node1", childEvents1::add);
+        zk2.getChildren("/node1", childEvents2::add);
+
+        // Add multiple data watches
+        BlockingDeque<WatchedEvent> dataEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> dataEvents2 = new LinkedBlockingDeque<>();
+        zk2.getData("/node1", dataEvents1::add, null);
+        zk2.exists("/node1", dataEvents2::add);
+
+        // Add multiple persistent watches
+        BlockingDeque<WatchedEvent> persistentEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> persistentEvents2 = new LinkedBlockingDeque<>();
+        zk2.addWatch("/node1", persistentEvents1::add, AddWatchMode.PERSISTENT);
+        zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT);
+
+        // Add multiple recursive watches
+        BlockingDeque<WatchedEvent> recursiveEvents1 = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents2 = new LinkedBlockingDeque<>();
+        zk2.addWatch("/node1", recursiveEvents1::add, AddWatchMode.PERSISTENT_RECURSIVE);
+        zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE);
+
+        assertWatchers(zk2, "/node1", WatcherType.values());
         removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync);
-        assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
-        assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is still a watcher after removal");
-        assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
-        assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");
 
-        assertEvent(persistentEvents, EventType.PersistentWatchRemoved, "/node1");
-        assertEvent(recursiveEvents, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(childEvents1, EventType.ChildWatchRemoved, "/node1");
+        assertEvent(childEvents2, EventType.ChildWatchRemoved, "/node1");
 
-        zk1.delete("/node1", -1);
-        assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
-        assertNull(recursiveEvents.poll(10, TimeUnit.MILLISECONDS));
-        assertEquals(2, watchCount.getCount(), "Received watch notification after removal!");
+        assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1");
+        assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1");
+
+        assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1");
+
+        assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1");
+        assertNoWatchers(zk2, "/node1", WatcherType.values());
+
+        zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1", null, -1);
+
+        assertNoEvent(childEvents1);
+        assertNoEvent(childEvents2);
+        assertNoEvent(dataEvents1);
+        assertNoEvent(dataEvents2);
+        assertNoEvent(persistentEvents1);
+        assertNoEvent(persistentEvents2);
+        assertNoEvent(recursiveEvents1);
+        assertNoEvent(recursiveEvents2);
     }
 
     private static class MyWatchManager extends ZKWatchManager {
@@ -1159,4 +1384,12 @@ public class RemoveWatchesTest extends ClientBase {
         assertEquals(eventType, event.getType());
         assertEquals(path, event.getPath());
     }
+
+    /**
+     * Asserts no event from queue in a short period.
+     */
+    private void assertNoEvent(BlockingQueue<WatchedEvent> events) throws InterruptedException {
+        // Short timeout so we don't hurt CI too much. It will fail finally given enough run if there are bugs.
+        assertNull(events.poll(10, TimeUnit.MILLISECONDS));
+    }
 }