Browse Source

ZOOKEEPER-4471: Match removing WatcherType to standard, persistent modes (#1998)

* ZOOKEEPER-4471: Match removing WatcherType to standard, persistent modes

Before ZOOKEEPER-1416, `WatcherType.Children` was used to remove
watchers attached through `ZooKeeper.getChildren`. `WatcherType.Data`
was used to remove watchers attached through `ZooKeeper.getData` and
`ZooKeeper.exists`.

ZOOKEEPER-1416 adds `AddWatchMode.PERSISTENT`. This watcher could be
completed remove using `WatcherType.Any`. But when removing through
`WatcherType.Data` or `WatcherType.Children`, part of
`AddWatchMode.PERSISTENT` will be left behind. And we get persistent
children or data watchers.

We are never claiming to support these type of watchers. So fix it.

In rare chance, we are going to support persistent data or children
watchers in future, I think we probably don't want to do such "magic"
thing in ZooKeeper. So fix it.

This is a step towards ZOOKEEPER-4472 which proposed to support
`WatcherType.Persistent` and `WatcherType.PersistentRecursive` to remove
persistent watchers.

* Refactor newly added tests in WatchManagerTest

I found it somewhat hard to follow in self-review. Add given-when-then
comments from my best hope for reviewing and maintenance.
Kezhu Wang 2 years ago
parent
commit
b8d458f5c4

+ 8 - 8
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -1558,16 +1558,16 @@ public class DataTree {
         boolean containsWatcher = false;
         switch (type) {
         case Children:
-            containsWatcher = this.childWatches.containsWatcher(path, watcher);
+            containsWatcher = this.childWatches.containsWatcher(path, watcher, WatcherMode.STANDARD);
             break;
         case Data:
-            containsWatcher = this.dataWatches.containsWatcher(path, watcher);
+            containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.STANDARD);
             break;
         case Any:
-            if (this.childWatches.containsWatcher(path, watcher)) {
+            if (this.childWatches.containsWatcher(path, watcher, null)) {
                 containsWatcher = true;
             }
-            if (this.dataWatches.containsWatcher(path, watcher)) {
+            if (this.dataWatches.containsWatcher(path, watcher, null)) {
                 containsWatcher = true;
             }
             break;
@@ -1579,16 +1579,16 @@ public class DataTree {
         boolean removed = false;
         switch (type) {
         case Children:
-            removed = this.childWatches.removeWatcher(path, watcher);
+            removed = this.childWatches.removeWatcher(path, watcher, WatcherMode.STANDARD);
             break;
         case Data:
-            removed = this.dataWatches.removeWatcher(path, watcher);
+            removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.STANDARD);
             break;
         case Any:
-            if (this.childWatches.removeWatcher(path, watcher)) {
+            if (this.childWatches.removeWatcher(path, watcher, null)) {
                 removed = true;
             }
-            if (this.dataWatches.removeWatcher(path, watcher)) {
+            if (this.dataWatches.removeWatcher(path, watcher, null)) {
                 removed = true;
             }
             break;

+ 31 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java

@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server.watch;
 
 import java.io.PrintWriter;
+import javax.annotation.Nullable;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 
@@ -60,6 +61,21 @@ public interface IWatchManager {
      */
     boolean containsWatcher(String path, Watcher watcher);
 
+    /**
+     * Checks the specified watcher exists for the given path and mode.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     * @param watcherMode watcher mode, null for any mode
+     * @return true if the watcher exists, false otherwise
+     */
+    default boolean containsWatcher(String path, Watcher watcher, @Nullable WatcherMode watcherMode) {
+        if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
+            return containsWatcher(path, watcher);
+        }
+        throw new UnsupportedOperationException("persistent watch");
+    }
+
     /**
      * Removes the specified watcher for the given path.
      *
@@ -70,6 +86,21 @@ public interface IWatchManager {
      */
     boolean removeWatcher(String path, Watcher watcher);
 
+    /**
+     * Removes the specified watcher for the given path and mode.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     * @param watcherMode watcher mode, null to remove all modes
+     * @return true if the watcher successfully removed, false otherwise
+     */
+    default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
+        if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
+            return removeWatcher(path, watcher);
+        }
+        throw new UnsupportedOperationException("persistent watch");
+    }
+
     /**
      * The entry to remove the watcher when the cnxn is closed.
      *

+ 45 - 12
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java

@@ -249,36 +249,69 @@ public class WatchManager implements IWatchManager {
 
     @Override
     public synchronized boolean containsWatcher(String path, Watcher watcher) {
-        Set<Watcher> list = watchTable.get(path);
-        return list != null && list.contains(watcher);
+        return containsWatcher(path, watcher, null);
     }
 
     @Override
-    public synchronized boolean removeWatcher(String path, Watcher watcher) {
+    public synchronized boolean containsWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
         Map<String, WatchStats> paths = watch2Paths.get(watcher);
         if (paths == null) {
             return false;
         }
+        WatchStats stats = paths.get(path);
+        return stats != null && (watcherMode == null || stats.hasMode(watcherMode));
+    }
 
+    private WatchStats unwatch(String path, Watcher watcher, Map<String, WatchStats> paths, Set<Watcher> watchers) {
         WatchStats stats = paths.remove(path);
         if (stats == null) {
-            return false;
+            return WatchStats.NONE;
         }
-        if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
-            --recursiveWatchQty;
+        if (paths.isEmpty()) {
+            watch2Paths.remove(watcher);
+        }
+        watchers.remove(watcher);
+        if (watchers.isEmpty()) {
+            watchTable.remove(path);
         }
+        return stats;
+    }
 
-        Set<Watcher> list = watchTable.get(path);
-        if (list == null || !list.remove(watcher)) {
-            LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher);
+    @Override
+    public synchronized boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
+        Map<String, WatchStats> paths = watch2Paths.get(watcher);
+        Set<Watcher> watchers = watchTable.get(path);
+        if (paths == null || watchers == null) {
             return false;
         }
 
-        if (list.isEmpty()) {
-            watchTable.remove(path);
+        WatchStats oldStats;
+        WatchStats newStats;
+        if (watcherMode != null) {
+            oldStats = paths.getOrDefault(path, WatchStats.NONE);
+            newStats = oldStats.removeMode(watcherMode);
+            if (newStats != WatchStats.NONE) {
+                if (newStats != oldStats) {
+                    paths.put(path, newStats);
+                }
+            } else if (oldStats != WatchStats.NONE) {
+                unwatch(path, watcher, paths, watchers);
+            }
+        } else {
+            oldStats = unwatch(path, watcher, paths, watchers);
+            newStats = WatchStats.NONE;
         }
 
-        return true;
+        if (oldStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE) && !newStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
+            --recursiveWatchQty;
+        }
+
+        return oldStats != newStats;
+    }
+
+    @Override
+    public synchronized boolean removeWatcher(String path, Watcher watcher) {
+        return removeWatcher(path, watcher, null);
     }
 
     // VisibleForTesting

+ 69 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java

@@ -34,7 +34,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.commons.collections4.CollectionUtils;
@@ -847,11 +850,35 @@ public class RemoveWatchesTest extends ClientBase {
         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
 
+        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
+        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);
+
         assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
         removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync);
         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
 
         assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
+
+        zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1/child", new byte[0], -1);
+        zk1.delete("/node1/child", -1);
+        zk1.setData("/node1", new byte[0], -1);
+        zk1.delete("/node1", -1);
+
+        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");
+
+        assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
+        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1/child");
+        assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1/child");
+        assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1");
+
+        assertEquals(2, dWatchCount.getCount(), "Received watch notification after removal!");
     }
 
     /**
@@ -895,11 +922,27 @@ public class RemoveWatchesTest extends ClientBase {
         LOG.info("Adding child watcher {} on path {}", w2, "/node1");
         assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");
 
+        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
+        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+
         assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
         removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync);
         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher");
 
         assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");
+
+        zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.setData("/node1/child", new byte[0], -1);
+        zk1.delete("/node1/child", -1);
+        zk1.setData("/node1", new byte[0], -1);
+        zk1.delete("/node1", -1);
+
+        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
+        assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");
+
+        assertEquals(2, cWatchCount.getCount(), "Received watch notification after removal!");
     }
 
     /**
@@ -953,10 +996,26 @@ public class RemoveWatchesTest extends ClientBase {
         LOG.info("Adding data watcher {} on path {}", w2, "/node1");
         assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");
 
+        BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
+        BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
+        zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
+        zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);
+
+        assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is not a watcher");
         assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
+        assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
         removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync);
         assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
+        assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is still a watcher after removal");
         assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
+        assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");
+
+        assertEvent(persistentEvents, EventType.PersistentWatchRemoved, "/node1");
+        assertEvent(recursiveEvents, EventType.PersistentWatchRemoved, "/node1");
+
+        zk1.delete("/node1", -1);
+        assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
+        assertNull(recursiveEvents.poll(10, TimeUnit.MILLISECONDS));
         assertEquals(2, watchCount.getCount(), "Received watch notification after removal!");
     }
 
@@ -1090,4 +1149,14 @@ public class RemoveWatchesTest extends ClientBase {
         return false;
     }
 
+    /**
+     * Asserts next event from queue has given event type and path.
+     */
+    private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
+            throws InterruptedException {
+        WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+        assertNotNull(event);
+        assertEquals(eventType, event.getType());
+        assertEquals(path, event.getPath());
+    }
 }

+ 313 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java

@@ -19,6 +19,7 @@ package org.apache.zookeeper.server.watch;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,6 +39,7 @@ import org.apache.zookeeper.server.DumbWatcher;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -342,6 +344,317 @@ public class WatchManagerTest extends ZKTestCase {
         assertEquals(watchesAdded.get(), watchesRemoved.get() + manager.size());
     }
 
+    /**
+     * Test add, contains and remove on generic watch manager.
+     */
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testAddRemoveWatcher(String className) throws IOException {
+        IWatchManager manager = getWatchManager(className);
+        Watcher watcher1 = new DumbWatcher();
+        Watcher watcher2 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1"
+        manager.addWatch("/node1", watcher1);
+
+        // then: contains or remove should fail on mismatch path and watcher pair
+        assertFalse(manager.containsWatcher("/node1", watcher2));
+        assertFalse(manager.containsWatcher("/node2", watcher1));
+        assertFalse(manager.removeWatcher("/node1", watcher2));
+        assertFalse(manager.removeWatcher("/node2", watcher1));
+
+        // then: contains or remove should succeed on matching path and watcher pair
+        assertTrue(manager.containsWatcher("/node1", watcher1));
+        assertTrue(manager.removeWatcher("/node1", watcher1));
+
+        // then: contains or remove should fail on removed path and watcher pair
+        assertFalse(manager.containsWatcher("/node1", watcher1));
+        assertFalse(manager.removeWatcher("/node1", watcher1));
+    }
+
+    /**
+     * Test containsWatcher on all pairs, and removeWatcher on mismatch pairs.
+     */
+    @Test
+    public void testContainsMode() {
+        IWatchManager manager = new WatchManager();
+        Watcher watcher1 = new DumbWatcher();
+        Watcher watcher2 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1" in persistent mode
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertNotEquals(WatcherMode.PERSISTENT, WatcherMode.DEFAULT_WATCHER_MODE);
+
+        // then: contains should succeed on watcher1 to "/node1" in persistent and any mode
+        assertTrue(manager.containsWatcher("/node1", watcher1));
+        assertTrue(manager.containsWatcher("/node1", watcher1, null));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+
+        // then: contains and remove should fail on mismatch watcher
+        assertFalse(manager.containsWatcher("/node1", watcher2));
+        assertFalse(manager.containsWatcher("/node1", watcher2, null));
+        assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.PERSISTENT));
+        assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node1", watcher2));
+        assertFalse(manager.removeWatcher("/node1", watcher2, null));
+        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: contains and remove should fail on mismatch path
+        assertFalse(manager.containsWatcher("/node2", watcher1));
+        assertFalse(manager.containsWatcher("/node2", watcher1, null));
+        assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node2", watcher1));
+        assertFalse(manager.removeWatcher("/node2", watcher1, null));
+        assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: contains and remove should fail on mismatch modes
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // when: add watcher1 to "/node1" in remaining modes
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: contains should succeed on watcher to "/node1" in all modes
+        assertTrue(manager.containsWatcher("/node1", watcher1));
+        assertTrue(manager.containsWatcher("/node1", watcher1, null));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+    }
+
+    /**
+     * Test repeatedly {@link WatchManager#addWatch(String, Watcher, WatcherMode)}.
+     */
+    @Test
+    public void testAddModeRepeatedly() {
+        IWatchManager manager = new WatchManager();
+        Watcher watcher1 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1" in all modes
+        manager.addWatch("/node1", watcher1, WatcherMode.STANDARD);
+        manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT);
+        manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE);
+
+        // when: add watcher1 to "/node1" in these modes repeatedly
+        assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: contains and remove should work normally on watcher1 to "/node1"
+        assertTrue(manager.containsWatcher("/node1", watcher1));
+        assertTrue(manager.containsWatcher("/node1", watcher1, null));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+
+        assertTrue(manager.containsWatcher("/node1", watcher1));
+        assertTrue(manager.containsWatcher("/node1", watcher1, null));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+
+        assertTrue(manager.containsWatcher("/node1", watcher1));
+        assertTrue(manager.containsWatcher("/node1", watcher1, null));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        assertFalse(manager.containsWatcher("/node1", watcher1));
+        assertFalse(manager.containsWatcher("/node1", watcher1, null));
+        assertFalse(manager.removeWatcher("/node1", watcher1));
+        assertFalse(manager.removeWatcher("/node1", watcher1, null));
+    }
+
+    /**
+     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} on one pair should not break others.
+     */
+    @Test
+    public void testRemoveModeOne() {
+        IWatchManager manager = new WatchManager();
+        Watcher watcher1 = new DumbWatcher();
+        Watcher watcher2 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1" and watcher2 to "/node2" in all modes
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // when: remove one pair
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+
+        // then: contains and remove should succeed on other pairs
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.STANDARD));
+        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT));
+        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.STANDARD));
+        assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.PERSISTENT));
+        assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+    }
+
+    /**
+     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} with {@code null} watcher mode.
+     */
+    @Test
+    public void testRemoveModeAll() {
+        IWatchManager manager = new WatchManager();
+        Watcher watcher1 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1" in all modes
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // when: remove watcher1 using null watcher mode
+        assertTrue(manager.removeWatcher("/node1", watcher1, null));
+
+        // then: contains and remove should fail on watcher1 to "/node1" in all modes
+        assertFalse(manager.containsWatcher("/node1", watcher1));
+        assertFalse(manager.containsWatcher("/node1", watcher1, null));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node1", watcher1));
+        assertFalse(manager.removeWatcher("/node1", watcher1, null));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // given: add watcher1 to "/node1" in all modes
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: remove watcher1 without a mode should behave same to removing all modes
+        assertTrue(manager.removeWatcher("/node1", watcher1));
+
+        assertFalse(manager.containsWatcher("/node1", watcher1));
+        assertFalse(manager.containsWatcher("/node1", watcher1, null));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node1", watcher1));
+        assertFalse(manager.removeWatcher("/node1", watcher1, null));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+    }
+
+    /**
+     * Test {@link WatchManager#removeWatcher(String, Watcher)}.
+     */
+    @Test
+    public void testRemoveModeAllDefault() {
+        IWatchManager manager = new WatchManager();
+        Watcher watcher1 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1" in all modes
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: remove watcher1 without a mode should behave same to removing all modes
+        assertTrue(manager.removeWatcher("/node1", watcher1));
+
+        assertFalse(manager.containsWatcher("/node1", watcher1));
+        assertFalse(manager.containsWatcher("/node1", watcher1, null));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node1", watcher1));
+        assertFalse(manager.removeWatcher("/node1", watcher1, null));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+    }
+
+    /**
+     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} all modes individually.
+     */
+    @Test
+    public void testRemoveModeAllIndividually() {
+        IWatchManager manager = new WatchManager();
+        Watcher watcher1 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1" in all modes
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // when: remove all modes individually
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: contains and remove should fail on watcher1 to "/node1" in all modes
+        assertFalse(manager.containsWatcher("/node1", watcher1));
+        assertFalse(manager.containsWatcher("/node1", watcher1, null));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertFalse(manager.removeWatcher("/node1", watcher1));
+        assertFalse(manager.removeWatcher("/node1", watcher1, null));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+    }
+
+    /**
+     * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} on mismatch pair should break nothing.
+     */
+    @Test
+    public void testRemoveModeMismatch() {
+        IWatchManager manager = new WatchManager();
+        Watcher watcher1 = new DumbWatcher();
+        Watcher watcher2 = new DumbWatcher();
+
+        // given: add watcher1 to "/node1" and watcher2 to "/node2" in all modes
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.STANDARD));
+        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT));
+        assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // when: remove mismatch path and watcher pairs
+        assertFalse(manager.removeWatcher("/node1", watcher2));
+        assertFalse(manager.removeWatcher("/node1", watcher2, null));
+        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.STANDARD));
+        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT));
+        assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+
+        // then: no existing watching pairs should break
+        assertTrue(manager.containsWatcher("/node1", watcher1));
+        assertTrue(manager.containsWatcher("/node1", watcher1, null));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT));
+        assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE));
+        assertTrue(manager.containsWatcher("/node2", watcher2));
+        assertTrue(manager.containsWatcher("/node2", watcher2, null));
+        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.STANDARD));
+        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT));
+        assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE));
+    }
+
     /**
      * Concurrently add watch while close the watcher to simulate the
      * client connections closed on prod.