Browse Source

ZOOKEEPER-1416 - Persistent, recursive watchers

### Background

Note: this is a port of https://github.com/apache/zookeeper/pull/136

Implementation for a persistent, recursive watch addition for ZK. These watches are set via a new method, addPersistentWatch() and are removed via the existing watcher removal methods. Persistent, recursive watches have these characteristics: a) Once set, they do not auto-remove when triggered; b) they trigger for all event types (child, data, etc.) on the node they are registered for and any child znode recursively; c) they are efficiently implemented by using the existing watch internals. A new class PathIterator walks up the path parent-by-parent when checking if a watcher applies.

### Implementation Details

- A new enum manages the different "modes" for watchers: `WatcherMode`.
- For traditional, "standard" watchers, the code path is almost exactly the same. There is very little overhead other than a bit of extra checks in `WatchManager`
- Given how this is implemented it was difficult to add support when `WatchManagerOptimized` is used. I'm open to adding it for that version but it will take work. We should consider not supporting persistent/recursive watchers when WatchManagerOptimized is used. I notice that `WatchManagerOptimized` is not even mentioned in the docs.
- The mode for a given watcher/path pair is held in a map inside of `WatcherModeManager`. The absence of an entry means Standard. This way, there's no overhead for old, standard watchers.
- `PathParentIterator` is the "meat" of the implementation. Rather than set watchers on every ZNode implied by a recursive watcher. WatchManager passes any paths it processes through PathParentIterator which iterates up each parent znode looking for watchers.
- The remainder of the changes are scaffolding to match how other watchers are used as well as Jute/API changes to set persistent/recursive watchers

### Testing

The tests were written years ago. I think they're comprehensive but reviewers should pay attention to anything that was missed. There is much ZooKeeper knowledge that's only in the heads of ZK committers.

- `PersistentWatcherTest` - tests persistent, non-recursive watchers
- `PersistentRecursiveWatcherTest` - tests persistent, recursive watchers
- `PathParentIteratorTest`- exercises edges of PathParentIterator

Author: randgalt <jordan@jordanzimmerman.com>

Reviewers: Enrico Olivelli <eolivelli@apache,org>, Norbert Kalmar <nkalmar@apache.org>, Andor Molnár <andor@apache.org>, Justin Mao Ling <maoling199210191@sina.com>

Closes #1106 from Randgalt/ZOOKEEPER-1416
randgalt 5 years ago
parent
commit
553639378d
27 changed files with 1699 additions and 54 deletions
  1. 5 0
      zookeeper-docs/src/main/resources/markdown/zookeeperOver.md
  2. 25 2
      zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md
  3. 12 0
      zookeeper-jute/src/main/resources/zookeeper.jute
  4. 67 0
      zookeeper-server/src/main/java/org/apache/zookeeper/AddWatchMode.java
  5. 31 5
      zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
  6. 4 1
      zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
  7. 11 0
      zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java
  8. 204 0
      zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
  9. 2 0
      zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeperMain.java
  10. 87 0
      zookeeper-server/src/main/java/org/apache/zookeeper/cli/AddWatchCommand.java
  11. 17 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
  12. 32 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
  13. 2 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
  14. 5 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
  15. 19 2
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
  16. 2 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java
  17. 24 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
  18. 106 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/PathParentIterator.java
  19. 101 41
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
  20. 56 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java
  21. 96 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java
  22. 84 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/PathParentIteratorTest.java
  23. 197 0
      zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java
  24. 1 1
      zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
  25. 174 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
  26. 211 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentWatcherTest.java
  27. 124 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java

+ 5 - 0
zookeeper-docs/src/main/resources/markdown/zookeeperOver.md

@@ -146,6 +146,11 @@ receives a packet saying that the znode has changed. If the
 connection between the client and one of the ZooKeeper servers is
 connection between the client and one of the ZooKeeper servers is
 broken, the client will receive a local notification.
 broken, the client will receive a local notification.
 
 
+**New in 3.6.0:** Clients can also set
+permanent, recursive watches on a znode that are not removed when triggered
+and that trigger for changes on the registered znode as well as any children
+znodes recursively.
+       
 <a name="Guarantees"></a>
 <a name="Guarantees"></a>
 
 
 ### Guarantees
 ### Guarantees

+ 25 - 2
zookeeper-docs/src/main/resources/markdown/zookeeperProgrammers.md

@@ -32,6 +32,7 @@ limitations under the License.
 * [ZooKeeper Sessions](#ch_zkSessions)
 * [ZooKeeper Sessions](#ch_zkSessions)
 * [ZooKeeper Watches](#ch_zkWatches)
 * [ZooKeeper Watches](#ch_zkWatches)
     * [Semantics of Watches](#sc_WatchSemantics)
     * [Semantics of Watches](#sc_WatchSemantics)
+    * [Persistent, Recursive Watches](#sc_WatchPersistentRecursive)
     * [Remove Watches](#sc_WatchRemoval)
     * [Remove Watches](#sc_WatchRemoval)
     * [What ZooKeeper Guarantees about Watches](#sc_WatchGuarantees)
     * [What ZooKeeper Guarantees about Watches](#sc_WatchGuarantees)
     * [Things to Remember about Watches](#sc_WatchRememberThese)
     * [Things to Remember about Watches](#sc_WatchRememberThese)
@@ -640,6 +641,11 @@ general this all occurs transparently. There is one case where a watch
 may be missed: a watch for the existence of a znode not yet created will
 may be missed: a watch for the existence of a znode not yet created will
 be missed if the znode is created and deleted while disconnected.
 be missed if the znode is created and deleted while disconnected.
 
 
+**New in 3.6.0:** Clients can also set
+permanent, recursive watches on a znode that are not removed when triggered
+and that trigger for changes on the registered znode as well as any children
+znodes recursively.
+       
 <a name="sc_WatchSemantics"></a>
 <a name="sc_WatchSemantics"></a>
 
 
 ### Semantics of Watches
 ### Semantics of Watches
@@ -657,6 +663,21 @@ the events that a watch can trigger and the calls that enable them:
 * **Child event:**
 * **Child event:**
   Enabled with a call to getChildren.
   Enabled with a call to getChildren.
 
 
+<a name="sc_WatchPersistentRecursive"></a>
+
+### Persistent, Recursive Watches
+
+**New in 3.6.0:** There is now a variation on the standard
+watch described above whereby you can set a watch that does not get removed when triggered.
+Additionally, these watches trigger the event types *NodeCreated*, *NodeDeleted*, and *NodeDataChanged* 
+and, optionally, recursively for all znodes starting at the znode that the watch is registered for. Note 
+that *NodeChildrenChanged* events are not triggered for persistent recursive watches as it would be redundant.
+
+Persistent watches are set using the method *addWatch()*. The triggering semantics and guarantees
+(other than one-time triggering) are the same as standard watches. The only exception regarding events is that
+recursive persistent watchers never trigger child changed events as they are redundant.
+Persistent watches are removed using *removeWatches()* with watcher type *WatcherType.Any*.
+       
 <a name="sc_WatchRemoval"></a>
 <a name="sc_WatchRemoval"></a>
 
 
 ### Remove Watches
 ### Remove Watches
@@ -671,6 +692,8 @@ successful watch removal.
   Watcher which was added with a call to getChildren.
   Watcher which was added with a call to getChildren.
 * **Data Remove event:**
 * **Data Remove event:**
   Watcher which was added with a call to exists or getData.
   Watcher which was added with a call to exists or getData.
+* **Persistent Remove event:**
+  Watcher which was added with a call to add a persistent watch.
 
 
 <a name="sc_WatchGuarantees"></a>
 <a name="sc_WatchGuarantees"></a>
 
 
@@ -693,11 +716,11 @@ guarantees:
 
 
 ### Things to Remember about Watches
 ### Things to Remember about Watches
 
 
-* Watches are one time triggers; if you get a watch event and
+* Standard watches are one time triggers; if you get a watch event and
   you want to get notified of future changes, you must set another
   you want to get notified of future changes, you must set another
   watch.
   watch.
 
 
-* Because watches are one time triggers and there is latency
+* Because standard watches are one time triggers and there is latency
   between getting the event and sending a new request to get a watch
   between getting the event and sending a new request to get a watch
   you cannot reliably see every change that happens to a node in
   you cannot reliably see every change that happens to a node in
   ZooKeeper. Be prepared to handle the case where the znode changes
   ZooKeeper. Be prepared to handle the case where the znode changes

+ 12 - 0
zookeeper-jute/src/main/resources/zookeeper.jute

@@ -73,6 +73,14 @@ module org.apache.zookeeper.proto {
         vector<ustring>existWatches;
         vector<ustring>existWatches;
         vector<ustring>childWatches;
         vector<ustring>childWatches;
     }
     }
+    class SetWatches2 {
+         long relativeZxid;
+         vector<ustring>dataWatches;
+         vector<ustring>existWatches;
+         vector<ustring>childWatches;
+         vector<ustring>persistentWatches;
+         vector<ustring>persistentRecursiveWatches;
+     }
     class RequestHeader {
     class RequestHeader {
         int xid;
         int xid;
         int type;
         int type;
@@ -180,6 +188,10 @@ module org.apache.zookeeper.proto {
     class SetACLResponse {
     class SetACLResponse {
         org.apache.zookeeper.data.Stat stat;
         org.apache.zookeeper.data.Stat stat;
     }
     }
+    class AddWatchRequest {
+         ustring path;
+         int mode;
+     }
     class WatcherEvent {
     class WatcherEvent {
         int type;  // event type
         int type;  // event type
         int state; // state of the Keeper client runtime
         int state; // state of the Keeper client runtime

+ 67 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/AddWatchMode.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+/**
+ * Modes available to {@link ZooKeeper#addWatch(String, Watcher, AddWatchMode)}
+ */
+public enum AddWatchMode {
+    /**
+     * <p>
+     * Set a watcher on the given path that does not get removed when triggered (i.e. it stays active
+     * until it is removed). This watcher
+     * is triggered for both data and child events. To remove the watcher, use
+     * <tt>removeWatches()</tt> with <tt>WatcherType.Any</tt>. The watcher behaves as if you placed an exists() watch and
+     * a getData() watch on the ZNode at the given path.
+     * </p>
+     */
+    PERSISTENT(ZooDefs.AddWatchModes.persistent),
+
+    /**
+     * <p>
+     * Set a watcher on the given path that: a) does not get removed when triggered (i.e. it stays active
+     * until it is removed); b) applies not only to the registered path but all child paths recursively. This watcher
+     * is triggered for both data and child events. To remove the watcher, use
+     * <tt>removeWatches()</tt> with <tt>WatcherType.Any</tt>
+     * </p>
+     *
+     * <p>
+     * The watcher behaves as if you placed an exists() watch and
+     * a getData() watch on the ZNode at the given path <strong>and</strong> any ZNodes that are children
+     * of the given path including children added later.
+     * </p>
+     *
+     * <p>
+     * NOTE: when there are active recursive watches there is a small performance decrease as all segments
+     * of ZNode paths must be checked for watch triggering.
+     * </p>
+     */
+    PERSISTENT_RECURSIVE(ZooDefs.AddWatchModes.persistentRecursive)
+    ;
+
+    public int getMode() {
+        return mode;
+    }
+
+    private final int mode;
+
+    AddWatchMode(int mode) {
+        this.mode = mode;
+    }
+}

+ 31 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -86,6 +86,7 @@ import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
 import org.apache.zookeeper.proto.SetWatches;
 import org.apache.zookeeper.proto.SetWatches;
+import org.apache.zookeeper.proto.SetWatches2;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.ZooKeeperThread;
 import org.apache.zookeeper.server.ZooKeeperThread;
@@ -990,16 +991,24 @@ public class ClientCnxn {
                 List<String> dataWatches = zooKeeper.getDataWatches();
                 List<String> dataWatches = zooKeeper.getDataWatches();
                 List<String> existWatches = zooKeeper.getExistWatches();
                 List<String> existWatches = zooKeeper.getExistWatches();
                 List<String> childWatches = zooKeeper.getChildWatches();
                 List<String> childWatches = zooKeeper.getChildWatches();
-                if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) {
+                List<String> persistentWatches = zooKeeper.getPersistentWatches();
+                List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
+                if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
+                        || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
                     Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                     Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                     Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                     Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                     Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                     Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
+                    Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
+                    Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
                     long setWatchesLastZxid = lastZxid;
                     long setWatchesLastZxid = lastZxid;
 
 
-                    while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
+                    while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
+                            || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
                         List<String> dataWatchesBatch = new ArrayList<String>();
                         List<String> dataWatchesBatch = new ArrayList<String>();
                         List<String> existWatchesBatch = new ArrayList<String>();
                         List<String> existWatchesBatch = new ArrayList<String>();
                         List<String> childWatchesBatch = new ArrayList<String>();
                         List<String> childWatchesBatch = new ArrayList<String>();
+                        List<String> persistentWatchesBatch = new ArrayList<String>();
+                        List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
                         int batchLength = 0;
                         int batchLength = 0;
 
 
                         // Note, we may exceed our max length by a bit when we add the last
                         // Note, we may exceed our max length by a bit when we add the last
@@ -1015,15 +1024,32 @@ public class ClientCnxn {
                             } else if (childWatchesIter.hasNext()) {
                             } else if (childWatchesIter.hasNext()) {
                                 watch = childWatchesIter.next();
                                 watch = childWatchesIter.next();
                                 childWatchesBatch.add(watch);
                                 childWatchesBatch.add(watch);
+                            }  else if (persistentWatchesIter.hasNext()) {
+                                watch = persistentWatchesIter.next();
+                                persistentWatchesBatch.add(watch);
+                            } else if (persistentRecursiveWatchesIter.hasNext()) {
+                                watch = persistentRecursiveWatchesIter.next();
+                                persistentRecursiveWatchesBatch.add(watch);
                             } else {
                             } else {
                                 break;
                                 break;
                             }
                             }
                             batchLength += watch.length();
                             batchLength += watch.length();
                         }
                         }
 
 
-                        SetWatches sw = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
-                        RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
-                        Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
+                        Record record;
+                        int opcode;
+                        if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
+                            // maintain compatibility with older servers - if no persistent/recursive watchers
+                            // are used, use the old version of SetWatches
+                            record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
+                            opcode = OpCode.setWatches;
+                        } else {
+                            record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
+                                    childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
+                            opcode = OpCode.setWatches2;
+                        }
+                        RequestHeader header = new RequestHeader(-8, opcode);
+                        Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                         outgoingQueue.addFirst(packet);
                         outgoingQueue.addFirst(packet);
                     }
                     }
                 }
                 }

+ 4 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java

@@ -143,7 +143,8 @@ public interface Watcher {
             NodeDataChanged(3),
             NodeDataChanged(3),
             NodeChildrenChanged(4),
             NodeChildrenChanged(4),
             DataWatchRemoved(5),
             DataWatchRemoved(5),
-            ChildWatchRemoved(6);
+            ChildWatchRemoved(6),
+            PersistentWatchRemoved (7);
 
 
             private final int intValue;     // Integer representation of value
             private final int intValue;     // Integer representation of value
             // for sending over wire
             // for sending over wire
@@ -172,6 +173,8 @@ public interface Watcher {
                     return EventType.DataWatchRemoved;
                     return EventType.DataWatchRemoved;
                 case 6:
                 case 6:
                     return EventType.ChildWatchRemoved;
                     return EventType.ChildWatchRemoved;
+                case 7:
+                    return EventType.PersistentWatchRemoved;
 
 
                 default:
                 default:
                     throw new RuntimeException("Invalid integer value for conversion to EventType");
                     throw new RuntimeException("Invalid integer value for conversion to EventType");

+ 11 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/ZooDefs.java

@@ -89,6 +89,10 @@ public class ZooDefs {
 
 
         int getAllChildrenNumber = 104;
         int getAllChildrenNumber = 104;
 
 
+        int setWatches2 = 105;
+
+        int addWatch = 106;
+
         int createSession = -10;
         int createSession = -10;
 
 
         int closeSession = -11;
         int closeSession = -11;
@@ -148,6 +152,13 @@ public class ZooDefs {
 
 
     }
     }
 
 
+    @InterfaceAudience.Public
+    public interface AddWatchModes {
+        int persistent = 0; // matches AddWatchMode.PERSISTENT
+
+        int persistentRecursive = 1;  // matches AddWatchMode.PERSISTENT_RECURSIVE
+    }
+
     public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};
     public static final String[] opNames = {"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping", "reconfig", "getConfig"};
 
 
 }
 }

+ 204 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java

@@ -53,12 +53,14 @@ import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.AddWatchRequest;
 import org.apache.zookeeper.proto.CheckWatchesRequest;
 import org.apache.zookeeper.proto.CheckWatchesRequest;
 import org.apache.zookeeper.proto.Create2Response;
 import org.apache.zookeeper.proto.Create2Response;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.CreateResponse;
 import org.apache.zookeeper.proto.CreateResponse;
 import org.apache.zookeeper.proto.CreateTTLRequest;
 import org.apache.zookeeper.proto.CreateTTLRequest;
 import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.ErrorResponse;
 import org.apache.zookeeper.proto.ExistsRequest;
 import org.apache.zookeeper.proto.ExistsRequest;
 import org.apache.zookeeper.proto.GetACLRequest;
 import org.apache.zookeeper.proto.GetACLRequest;
 import org.apache.zookeeper.proto.GetACLResponse;
 import org.apache.zookeeper.proto.GetACLResponse;
@@ -83,6 +85,7 @@ import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.EphemeralType;
 import org.apache.zookeeper.server.EphemeralType;
+import org.apache.zookeeper.server.watch.PathParentIterator;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -254,6 +257,18 @@ public class ZooKeeper implements AutoCloseable {
             return rc;
             return rc;
         }
         }
     }
     }
+    protected List<String> getPersistentWatches() {
+        synchronized (watchManager.persistentWatches) {
+            List<String> rc = new ArrayList<String>(watchManager.persistentWatches.keySet());
+            return rc;
+        }
+    }
+    protected List<String> getPersistentRecursiveWatches() {
+        synchronized (watchManager.persistentRecursiveWatches) {
+            List<String> rc = new ArrayList<String>(watchManager.persistentRecursiveWatches.keySet());
+            return rc;
+        }
+    }
 
 
     /**
     /**
      * Manage watchers and handle events generated by the ClientCnxn object.
      * Manage watchers and handle events generated by the ClientCnxn object.
@@ -267,6 +282,8 @@ public class ZooKeeper implements AutoCloseable {
         private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
         private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
         private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
         private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
         private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
         private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
+        private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
+        private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
         private boolean disableAutoWatchReset;
         private boolean disableAutoWatchReset;
 
 
         ZKWatchManager(boolean disableAutoWatchReset) {
         ZKWatchManager(boolean disableAutoWatchReset) {
@@ -296,6 +313,8 @@ public class ZooKeeper implements AutoCloseable {
             removedWatchers.put(EventType.ChildWatchRemoved, childWatchersToRem);
             removedWatchers.put(EventType.ChildWatchRemoved, childWatchersToRem);
             HashSet<Watcher> dataWatchersToRem = new HashSet<>();
             HashSet<Watcher> dataWatchersToRem = new HashSet<>();
             removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
             removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
+            HashSet<Watcher> persistentWatchersToRem = new HashSet<>();
+            removedWatchers.put(EventType.PersistentWatchRemoved, persistentWatchersToRem);
             boolean removedWatcher = false;
             boolean removedWatcher = false;
             switch (watcherType) {
             switch (watcherType) {
             case Children: {
             case Children: {
@@ -324,10 +343,23 @@ public class ZooKeeper implements AutoCloseable {
                     boolean removedDataWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem);
                     boolean removedDataWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem);
                     removedWatcher |= removedDataWatcher;
                     removedWatcher |= removedDataWatcher;
                 }
                 }
+
                 synchronized (existWatches) {
                 synchronized (existWatches) {
                     boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem);
                     boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem);
                     removedWatcher |= removedDataWatcher;
                     removedWatcher |= removedDataWatcher;
                 }
                 }
+
+                synchronized (persistentWatches) {
+                    boolean removedPersistentWatcher = removeWatches(persistentWatches,
+                            watcher, clientPath, local, rc, persistentWatchersToRem);
+                    removedWatcher |= removedPersistentWatcher;
+                }
+
+                synchronized (persistentRecursiveWatches) {
+                    boolean removedPersistentRecursiveWatcher = removeWatches(persistentRecursiveWatches,
+                            watcher, clientPath, local, rc, persistentWatchersToRem);
+                    removedWatcher |= removedPersistentRecursiveWatcher;
+                }
             }
             }
             }
             }
             // Watcher function doesn't exists for the specified params
             // Watcher function doesn't exists for the specified params
@@ -373,6 +405,18 @@ public class ZooKeeper implements AutoCloseable {
                 synchronized (childWatches) {
                 synchronized (childWatches) {
                     containsWatcher = contains(path, watcher, childWatches);
                     containsWatcher = contains(path, watcher, childWatches);
                 }
                 }
+
+                synchronized (persistentWatches) {
+                    boolean contains_temp = contains(path, watcher,
+                            persistentWatches);
+                    containsWatcher |= contains_temp;
+                }
+
+                synchronized (persistentRecursiveWatches) {
+                    boolean contains_temp = contains(path, watcher,
+                            persistentRecursiveWatches);
+                    containsWatcher |= contains_temp;
+                }
                 break;
                 break;
             }
             }
             case Data: {
             case Data: {
@@ -384,6 +428,18 @@ public class ZooKeeper implements AutoCloseable {
                     boolean contains_temp = contains(path, watcher, existWatches);
                     boolean contains_temp = contains(path, watcher, existWatches);
                     containsWatcher |= contains_temp;
                     containsWatcher |= contains_temp;
                 }
                 }
+
+                synchronized (persistentWatches) {
+                    boolean contains_temp = contains(path, watcher,
+                            persistentWatches);
+                    containsWatcher |= contains_temp;
+                }
+
+                synchronized (persistentRecursiveWatches) {
+                    boolean contains_temp = contains(path, watcher,
+                            persistentRecursiveWatches);
+                    containsWatcher |= contains_temp;
+                }
                 break;
                 break;
             }
             }
             case Any: {
             case Any: {
@@ -395,10 +451,23 @@ public class ZooKeeper implements AutoCloseable {
                     boolean contains_temp = contains(path, watcher, dataWatches);
                     boolean contains_temp = contains(path, watcher, dataWatches);
                     containsWatcher |= contains_temp;
                     containsWatcher |= contains_temp;
                 }
                 }
+
                 synchronized (existWatches) {
                 synchronized (existWatches) {
                     boolean contains_temp = contains(path, watcher, existWatches);
                     boolean contains_temp = contains(path, watcher, existWatches);
                     containsWatcher |= contains_temp;
                     containsWatcher |= contains_temp;
                 }
                 }
+
+                synchronized (persistentWatches) {
+                    boolean contains_temp = contains(path, watcher,
+                            persistentWatches);
+                    containsWatcher |= contains_temp;
+                }
+
+                synchronized (persistentRecursiveWatches) {
+                    boolean contains_temp = contains(path, watcher,
+                            persistentRecursiveWatches);
+                    containsWatcher |= contains_temp;
+                }
             }
             }
             }
             }
             // Watcher function doesn't exists for the specified params
             // Watcher function doesn't exists for the specified params
@@ -490,6 +559,18 @@ public class ZooKeeper implements AutoCloseable {
                     }
                     }
                 }
                 }
 
 
+                synchronized (persistentWatches) {
+                    for (Set<Watcher> ws: persistentWatches.values()) {
+                        result.addAll(ws);
+                    }
+                }
+
+                synchronized (persistentRecursiveWatches) {
+                    for (Set<Watcher> ws: persistentRecursiveWatches.values()) {
+                        result.addAll(ws);
+                    }
+                }
+
                 return result;
                 return result;
             case NodeDataChanged:
             case NodeDataChanged:
             case NodeCreated:
             case NodeCreated:
@@ -499,11 +580,13 @@ public class ZooKeeper implements AutoCloseable {
                 synchronized (existWatches) {
                 synchronized (existWatches) {
                     addTo(existWatches.remove(clientPath), result);
                     addTo(existWatches.remove(clientPath), result);
                 }
                 }
+                addPersistentWatches(clientPath, result);
                 break;
                 break;
             case NodeChildrenChanged:
             case NodeChildrenChanged:
                 synchronized (childWatches) {
                 synchronized (childWatches) {
                     addTo(childWatches.remove(clientPath), result);
                     addTo(childWatches.remove(clientPath), result);
                 }
                 }
+                addPersistentWatches(clientPath, result);
                 break;
                 break;
             case NodeDeleted:
             case NodeDeleted:
                 synchronized (dataWatches) {
                 synchronized (dataWatches) {
@@ -520,6 +603,7 @@ public class ZooKeeper implements AutoCloseable {
                 synchronized (childWatches) {
                 synchronized (childWatches) {
                     addTo(childWatches.remove(clientPath), result);
                     addTo(childWatches.remove(clientPath), result);
                 }
                 }
+                addPersistentWatches(clientPath, result);
                 break;
                 break;
             default:
             default:
                 String errorMsg = String.format(
                 String errorMsg = String.format(
@@ -534,6 +618,16 @@ public class ZooKeeper implements AutoCloseable {
             return result;
             return result;
         }
         }
 
 
+        private void addPersistentWatches(String clientPath, Set<Watcher> result) {
+            synchronized (persistentWatches) {
+                addTo(persistentWatches.get(clientPath), result);
+            }
+            synchronized (persistentRecursiveWatches) {
+                for (String path : PathParentIterator.forAll(clientPath).asIterable()) {
+                    addTo(persistentRecursiveWatches.get(path), result);
+                }
+            }
+        }
     }
     }
 
 
     /**
     /**
@@ -627,6 +721,31 @@ public class ZooKeeper implements AutoCloseable {
 
 
     }
     }
 
 
+    class AddWatchRegistration extends WatchRegistration {
+        private final AddWatchMode mode;
+
+        public AddWatchRegistration(Watcher watcher, String clientPath, AddWatchMode mode) {
+            super(watcher, clientPath);
+            this.mode = mode;
+        }
+
+        @Override
+        protected Map<String, Set<Watcher>> getWatches(int rc) {
+            switch (mode) {
+                case PERSISTENT:
+                    return watchManager.persistentWatches;
+                case PERSISTENT_RECURSIVE:
+                    return watchManager.persistentRecursiveWatches;
+            }
+            throw new IllegalArgumentException("Mode not supported: " + mode);
+        }
+
+        @Override
+        protected boolean shouldAddWatch(int rc) {
+            return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
+        }
+    }
+
     @InterfaceAudience.Public
     @InterfaceAudience.Public
     public enum States {
     public enum States {
         CONNECTING,
         CONNECTING,
@@ -3035,6 +3154,91 @@ public class ZooKeeper implements AutoCloseable {
         removeWatches(ZooDefs.OpCode.removeWatches, path, null, watcherType, local, cb, ctx);
         removeWatches(ZooDefs.OpCode.removeWatches, path, null, watcherType, local, cb, ctx);
     }
     }
 
 
+    /**
+     * Add a watch to the given znode using the given mode. Note: not all
+     * watch types can be set with this method. Only the modes available
+     * in {@link AddWatchMode} can be set with this method.
+     *
+     * @param basePath the path that the watcher applies to
+     * @param watcher the watcher
+     * @param mode type of watcher to add
+     * @throws InterruptedException If the server transaction is interrupted.
+     * @throws KeeperException If the server signals an error with a non-zero
+     *  error code.
+     * @since 3.6.0
+     */
+    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
+            throws KeeperException, InterruptedException {
+        PathUtils.validatePath(basePath);
+        String serverPath = prependChroot(basePath);
+
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.addWatch);
+        AddWatchRequest request = new AddWatchRequest(serverPath, mode.getMode());
+        ReplyHeader r = cnxn.submitRequest(h, request, new ErrorResponse(),
+                new AddWatchRegistration(watcher, basePath, mode));
+        if (r.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
+                    basePath);
+        }
+    }
+
+    /**
+     * Add a watch to the given znode using the given mode. Note: not all
+     * watch types can be set with this method. Only the modes available
+     * in {@link AddWatchMode} can be set with this method. In this version of the method,
+     * the default watcher is used
+     *
+     * @param basePath the path that the watcher applies to
+     * @param mode type of watcher to add
+     * @throws InterruptedException If the server transaction is interrupted.
+     * @throws KeeperException If the server signals an error with a non-zero
+     *  error code.
+     * @since 3.6.0
+     */
+    public void addWatch(String basePath, AddWatchMode mode)
+            throws KeeperException, InterruptedException {
+        addWatch(basePath, watchManager.defaultWatcher, mode);
+    }
+
+    /**
+     * Async version of {@link #addWatch(String, Watcher, AddWatchMode)} (see it for details)
+     *
+     * @param basePath the path that the watcher applies to
+     * @param watcher the watcher
+     * @param mode type of watcher to add
+     * @param cb a handler for the callback
+     * @param ctx context to be provided to the callback
+     * @throws IllegalArgumentException if an invalid path is specified
+     * @since 3.6.0
+     */
+    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode,
+                         VoidCallback cb, Object ctx) {
+        PathUtils.validatePath(basePath);
+        String serverPath = prependChroot(basePath);
+
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.addWatch);
+        AddWatchRequest request = new AddWatchRequest(serverPath, mode.getMode());
+        cnxn.queuePacket(h, new ReplyHeader(), request, new ErrorResponse(), cb,
+                basePath, serverPath, ctx, new AddWatchRegistration(watcher, basePath, mode));
+    }
+
+    /**
+     * Async version of {@link #addWatch(String, AddWatchMode)} (see it for details)
+     *
+     * @param basePath the path that the watcher applies to
+     * @param mode type of watcher to add
+     * @param cb a handler for the callback
+     * @param ctx context to be provided to the callback
+     * @throws IllegalArgumentException if an invalid path is specified
+     * @since 3.6.0
+     */
+    public void addWatch(String basePath, AddWatchMode mode,
+                         VoidCallback cb, Object ctx) {
+        addWatch(basePath, watchManager.defaultWatcher, mode, cb, ctx);
+    }
+
     private void validateWatcher(Watcher watcher) {
     private void validateWatcher(Watcher watcher) {
         if (watcher == null) {
         if (watcher == null) {
             throw new IllegalArgumentException("Invalid Watcher, shouldn't be null!");
             throw new IllegalArgumentException("Invalid Watcher, shouldn't be null!");

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeperMain.java

@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.admin.ZooKeeperAdmin;
 import org.apache.zookeeper.cli.AddAuthCommand;
 import org.apache.zookeeper.cli.AddAuthCommand;
+import org.apache.zookeeper.cli.AddWatchCommand;
 import org.apache.zookeeper.cli.CliCommand;
 import org.apache.zookeeper.cli.CliCommand;
 import org.apache.zookeeper.cli.CliException;
 import org.apache.zookeeper.cli.CliException;
 import org.apache.zookeeper.cli.CloseCommand;
 import org.apache.zookeeper.cli.CloseCommand;
@@ -123,6 +124,7 @@ public class ZooKeeperMain {
         new GetEphemeralsCommand().addToMap(commandMapCli);
         new GetEphemeralsCommand().addToMap(commandMapCli);
         new GetAllChildrenNumberCommand().addToMap(commandMapCli);
         new GetAllChildrenNumberCommand().addToMap(commandMapCli);
         new VersionCommand().addToMap(commandMapCli);
         new VersionCommand().addToMap(commandMapCli);
+        new AddWatchCommand().addToMap(commandMapCli);
 
 
         // add all to commandMap
         // add all to commandMap
         for (Entry<String, CliCommand> entry : commandMapCli.entrySet()) {
         for (Entry<String, CliCommand> entry : commandMapCli.entrySet()) {

+ 87 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/cli/AddWatchCommand.java

@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.cli;
+
+import java.util.Arrays;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.Parser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * addWatch command for cli.
+ * Matches the ZooKeeper API addWatch()
+ */
+public class AddWatchCommand extends CliCommand {
+
+    private static final Options options = new Options();
+    private static final AddWatchMode defaultMode = AddWatchMode.PERSISTENT_RECURSIVE;
+
+    private CommandLine cl;
+    private AddWatchMode mode = defaultMode;
+
+    static {
+        options.addOption("m", true, "");
+    }
+
+    public AddWatchCommand() {
+        super("addWatch", "[-m mode] path # optional mode is one of "
+                + Arrays.toString(AddWatchMode.values()) + " - default is " + defaultMode.name());
+    }
+
+    @Override
+    public CliCommand parse(String[] cmdArgs) throws CliParseException {
+        Parser parser = new PosixParser();
+        try {
+            cl = parser.parse(options, cmdArgs);
+        } catch (ParseException ex) {
+            throw new CliParseException(ex);
+        }
+        if (cl.getArgs().length != 2) {
+            throw new CliParseException(getUsageStr());
+        }
+
+        if (cl.hasOption("m")) {
+            try {
+                mode = AddWatchMode.valueOf(cl.getOptionValue("m").toUpperCase());
+            } catch (IllegalArgumentException e) {
+                throw new CliParseException(getUsageStr());
+            }
+        }
+
+        return this;
+    }
+
+    @Override
+    public boolean exec() throws CliException {
+        String path = cl.getArgs()[1];
+        try {
+            zk.addWatch(path, mode);
+        } catch (KeeperException | InterruptedException ex) {
+            throw new CliWrapperException(ex);
+        }
+
+        return false;
+
+    }
+
+}

+ 17 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -58,6 +58,7 @@ import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.server.watch.IWatchManager;
 import org.apache.zookeeper.server.watch.IWatchManager;
 import org.apache.zookeeper.server.watch.WatchManagerFactory;
 import org.apache.zookeeper.server.watch.WatchManagerFactory;
+import org.apache.zookeeper.server.watch.WatcherMode;
 import org.apache.zookeeper.server.watch.WatcherOrBitSet;
 import org.apache.zookeeper.server.watch.WatcherOrBitSet;
 import org.apache.zookeeper.server.watch.WatchesPathReport;
 import org.apache.zookeeper.server.watch.WatchesPathReport;
 import org.apache.zookeeper.server.watch.WatchesReport;
 import org.apache.zookeeper.server.watch.WatchesReport;
@@ -701,6 +702,12 @@ public class DataTree {
         }
         }
     }
     }
 
 
+    public void addWatch(String basePath, Watcher watcher, int mode) {
+        WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
+        dataWatches.addWatch(basePath, watcher, watcherMode);
+        childWatches.addWatch(basePath, watcher, watcherMode);
+    }
+
     public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
     public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
         DataNode n = nodes.get(path);
         DataNode n = nodes.get(path);
         byte[] data = null;
         byte[] data = null;
@@ -1499,7 +1506,8 @@ public class DataTree {
         childWatches.removeWatcher(watcher);
         childWatches.removeWatcher(watcher);
     }
     }
 
 
-    public void setWatches(long relativeZxid, List<String> dataWatches, List<String> existWatches, List<String> childWatches, Watcher watcher) {
+    public void setWatches(long relativeZxid, List<String> dataWatches, List<String> existWatches, List<String> childWatches,
+                           List<String> persistentWatches, List<String> persistentRecursiveWatches, Watcher watcher) {
         for (String path : dataWatches) {
         for (String path : dataWatches) {
             DataNode node = getNode(path);
             DataNode node = getNode(path);
             WatchedEvent e = null;
             WatchedEvent e = null;
@@ -1529,6 +1537,14 @@ public class DataTree {
                 this.childWatches.addWatch(path, watcher);
                 this.childWatches.addWatch(path, watcher);
             }
             }
         }
         }
+        for (String path : persistentWatches) {
+            this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT);
+            this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT);
+        }
+        for (String path : persistentRecursiveWatches) {
+            this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
+            this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
+        }
     }
     }
 
 
     /**
     /**

+ 32 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.Set;
 import java.util.Set;
@@ -47,9 +48,11 @@ import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.AddWatchRequest;
 import org.apache.zookeeper.proto.CheckWatchesRequest;
 import org.apache.zookeeper.proto.CheckWatchesRequest;
 import org.apache.zookeeper.proto.Create2Response;
 import org.apache.zookeeper.proto.Create2Response;
 import org.apache.zookeeper.proto.CreateResponse;
 import org.apache.zookeeper.proto.CreateResponse;
+import org.apache.zookeeper.proto.ErrorResponse;
 import org.apache.zookeeper.proto.ExistsRequest;
 import org.apache.zookeeper.proto.ExistsRequest;
 import org.apache.zookeeper.proto.ExistsResponse;
 import org.apache.zookeeper.proto.ExistsResponse;
 import org.apache.zookeeper.proto.GetACLRequest;
 import org.apache.zookeeper.proto.GetACLRequest;
@@ -69,6 +72,7 @@ import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetACLResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
 import org.apache.zookeeper.proto.SetDataResponse;
 import org.apache.zookeeper.proto.SetWatches;
 import org.apache.zookeeper.proto.SetWatches;
+import org.apache.zookeeper.proto.SetWatches2;
 import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncRequest;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.proto.SyncResponse;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
@@ -365,7 +369,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.setWatches: {
             case OpCode.setWatches: {
                 lastOp = "SETW";
                 lastOp = "SETW";
                 SetWatches setWatches = new SetWatches();
                 SetWatches setWatches = new SetWatches();
-                // TODO We really should NOT need this!!!!
+                // TODO we really should not need this
                 request.request.rewind();
                 request.request.rewind();
                 ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                 ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
                 long relativeZxid = setWatches.getRelativeZxid();
                 long relativeZxid = setWatches.getRelativeZxid();
@@ -375,9 +379,36 @@ public class FinalRequestProcessor implements RequestProcessor {
                        setWatches.getDataWatches(),
                        setWatches.getDataWatches(),
                        setWatches.getExistWatches(),
                        setWatches.getExistWatches(),
                        setWatches.getChildWatches(),
                        setWatches.getChildWatches(),
+                       Collections.emptyList(),
+                       Collections.emptyList(),
                        cnxn);
                        cnxn);
                 break;
                 break;
             }
             }
+            case OpCode.setWatches2: {
+                lastOp = "STW2";
+                SetWatches2 setWatches = new SetWatches2();
+                // TODO we really should not need this
+                request.request.rewind();
+                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
+                long relativeZxid = setWatches.getRelativeZxid();
+                zks.getZKDatabase().setWatches(relativeZxid,
+                        setWatches.getDataWatches(),
+                        setWatches.getExistWatches(),
+                        setWatches.getChildWatches(),
+                        setWatches.getPersistentWatches(),
+                        setWatches.getPersistentRecursiveWatches(),
+                        cnxn);
+                break;
+            }
+            case OpCode.addWatch: {
+                lastOp = "ADDW";
+                AddWatchRequest addWatcherRequest = new AddWatchRequest();
+                ByteBufferInputStream.byteBuffer2Record(request.request,
+                        addWatcherRequest);
+                zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
+                rsp = new ErrorResponse(0);
+                break;
+            }
             case OpCode.getACL: {
             case OpCode.getACL: {
                 lastOp = "GETA";
                 lastOp = "GETA";
                 GetACLRequest getACLRequest = new GetACLRequest();
                 GetACLRequest getACLRequest = new GetACLRequest();

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -797,10 +797,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             case OpCode.getChildren2:
             case OpCode.getChildren2:
             case OpCode.ping:
             case OpCode.ping:
             case OpCode.setWatches:
             case OpCode.setWatches:
+            case OpCode.setWatches2:
             case OpCode.checkWatches:
             case OpCode.checkWatches:
             case OpCode.removeWatches:
             case OpCode.removeWatches:
             case OpCode.getEphemerals:
             case OpCode.getEphemerals:
             case OpCode.multiRead:
             case OpCode.multiRead:
+            case OpCode.addWatch:
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 break;
                 break;
             default:
             default:

+ 5 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -243,9 +243,11 @@ public class Request {
         case OpCode.setACL:
         case OpCode.setACL:
         case OpCode.setData:
         case OpCode.setData:
         case OpCode.setWatches:
         case OpCode.setWatches:
+        case OpCode.setWatches2:
         case OpCode.sync:
         case OpCode.sync:
         case OpCode.checkWatches:
         case OpCode.checkWatches:
         case OpCode.removeWatches:
         case OpCode.removeWatches:
+        case OpCode.addWatch:
             return true;
             return true;
         default:
         default:
             return false;
             return false;
@@ -334,6 +336,8 @@ public class Request {
                 return "auth";
                 return "auth";
             case OpCode.setWatches:
             case OpCode.setWatches:
                 return "setWatches";
                 return "setWatches";
+            case OpCode.setWatches2:
+                return "setWatches2";
             case OpCode.sasl:
             case OpCode.sasl:
                 return "sasl";
                 return "sasl";
             case OpCode.getEphemerals:
             case OpCode.getEphemerals:
@@ -364,6 +368,7 @@ public class Request {
         String path = "n/a";
         String path = "n/a";
         if (type != OpCode.createSession
         if (type != OpCode.createSession
             && type != OpCode.setWatches
             && type != OpCode.setWatches
+            && type != OpCode.setWatches2
             && type != OpCode.closeSession
             && type != OpCode.closeSession
             && request != null
             && request != null
             && request.remaining() >= 4) {
             && request.remaining() >= 4) {

+ 19 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java

@@ -516,10 +516,27 @@ public class ZKDatabase {
      * @param dataWatches the data watches the client wants to reset
      * @param dataWatches the data watches the client wants to reset
      * @param existWatches the exists watches the client wants to reset
      * @param existWatches the exists watches the client wants to reset
      * @param childWatches the child watches the client wants to reset
      * @param childWatches the child watches the client wants to reset
+     * @param persistentWatches the persistent watches the client wants to reset
+     * @param persistentRecursiveWatches the persistent recursive watches the client wants to reset
      * @param watcher the watcher function
      * @param watcher the watcher function
      */
      */
-    public void setWatches(long relativeZxid, List<String> dataWatches, List<String> existWatches, List<String> childWatches, Watcher watcher) {
-        dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, watcher);
+    public void setWatches(long relativeZxid, List<String> dataWatches, List<String> existWatches, List<String> childWatches,
+                           List<String> persistentWatches, List<String> persistentRecursiveWatches, Watcher watcher) {
+        dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, persistentWatches, persistentRecursiveWatches, watcher);
+    }
+
+    /**
+     * Add a watch
+     *
+     * @param basePath
+     *            watch base
+     * @param watcher
+     *            the watcher
+     * @param mode
+     *            a mode from ZooDefs.AddWatchModes
+     */
+    public void addWatch(String basePath, Watcher watcher, int mode) {
+        dataTree.addWatch(basePath, watcher, mode);
     }
     }
 
 
     /**
     /**

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java

@@ -32,6 +32,7 @@ import static org.apache.zookeeper.ZooDefs.OpCode.getData;
 import static org.apache.zookeeper.ZooDefs.OpCode.removeWatches;
 import static org.apache.zookeeper.ZooDefs.OpCode.removeWatches;
 import static org.apache.zookeeper.ZooDefs.OpCode.setACL;
 import static org.apache.zookeeper.ZooDefs.OpCode.setACL;
 import static org.apache.zookeeper.ZooDefs.OpCode.setData;
 import static org.apache.zookeeper.ZooDefs.OpCode.setData;
+import static org.apache.zookeeper.ZooDefs.OpCode.setWatches2;
 import static org.apache.zookeeper.ZooDefs.OpCode.sync;
 import static org.apache.zookeeper.ZooDefs.OpCode.sync;
 import java.io.PrintWriter;
 import java.io.PrintWriter;
 import java.util.Arrays;
 import java.util.Arrays;
@@ -131,6 +132,7 @@ public class RequestPathMetricsCollector {
         requestsMap.put(Request.op2String(getChildren2), new PathStatsQueue(getChildren2));
         requestsMap.put(Request.op2String(getChildren2), new PathStatsQueue(getChildren2));
         requestsMap.put(Request.op2String(checkWatches), new PathStatsQueue(checkWatches));
         requestsMap.put(Request.op2String(checkWatches), new PathStatsQueue(checkWatches));
         requestsMap.put(Request.op2String(removeWatches), new PathStatsQueue(removeWatches));
         requestsMap.put(Request.op2String(removeWatches), new PathStatsQueue(removeWatches));
+        requestsMap.put(Request.op2String(setWatches2), new PathStatsQueue(setWatches2));
         requestsMap.put(Request.op2String(sync), new PathStatsQueue(sync));
         requestsMap.put(Request.op2String(sync), new PathStatsQueue(sync));
         this.immutableRequestsMap = java.util.Collections.unmodifiableMap(requestsMap);
         this.immutableRequestsMap = java.util.Collections.unmodifiableMap(requestsMap);
     }
     }

+ 24 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java

@@ -34,6 +34,22 @@ public interface IWatchManager {
      */
      */
     boolean addWatch(String path, Watcher watcher);
     boolean addWatch(String path, Watcher watcher);
 
 
+    /**
+     * Add watch to specific path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     * @param watcherMode the watcher mode to use
+     *
+     * @return true if the watcher added is not already present
+     */
+    default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
+        if (watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
+            return addWatch(path, watcher);
+        }
+        throw new UnsupportedOperationException();  // custom implementations must defeat this
+    }
+
     /**
     /**
      * Checks the specified watcher exists for the given path.
      * Checks the specified watcher exists for the given path.
      *
      *
@@ -129,4 +145,12 @@ public interface IWatchManager {
      */
      */
     void dumpWatches(PrintWriter pwriter, boolean byPath);
     void dumpWatches(PrintWriter pwriter, boolean byPath);
 
 
+    /**
+     * Return the current number of recursive watchers
+     *
+     * @return qty
+     */
+    default int getRecursiveWatchQty() {
+        return 0;
+    }
 }
 }

+ 106 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/PathParentIterator.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterates over a ZooKeeper path. Each iteration goes up one parent path. Thus, the
+ * effect of the iterator is to iterate over the initial path and then all of its parents.
+ */
+public class PathParentIterator implements Iterator<String> {
+    private String path;
+    private final int maxLevel;
+    private int level = -1;
+
+    /**
+     * Return a new PathParentIterator that iterates from the
+     * given path to all parents.
+     *
+     * @param path initial path
+     */
+    public static PathParentIterator forAll(String path) {
+        return new PathParentIterator(path, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Return a new PathParentIterator that only returns the given path - i.e.
+     * does not iterate to parent paths.
+     *
+     * @param path initial path
+     */
+    public static PathParentIterator forPathOnly(String path) {
+        return new PathParentIterator(path, 0);
+    }
+
+    private PathParentIterator(String path, int maxLevel) {
+        // NOTE: asserts that the path has already been validated
+        this.path = path;
+        this.maxLevel = maxLevel;
+    }
+
+    /**
+     * Return an Iterable view so that this Iterator can be used in for each
+     * statements. IMPORTANT: the returned Iterable is single use only
+     * @return Iterable
+     */
+    public Iterable<String> asIterable() {
+        return () -> PathParentIterator.this;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !path.isEmpty() && (level < maxLevel);
+    }
+
+    /**
+     * Returns true if this iterator is currently at a parent path as opposed
+     * to the initial path given to the constructor
+     *
+     * @return true/false
+     */
+    public boolean atParentPath() {
+        return level > 0;
+    }
+
+    @Override
+    public String next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        String localPath = path;
+        ++level;
+        if (path.equals("/")) {
+            path = "";
+        } else {
+            path = path.substring(0, path.lastIndexOf('/'));
+            if (path.length() == 0) {
+                path = "/";
+            }
+        }
+        return localPath;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+}

+ 101 - 41
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java

@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.watch;
 import java.io.PrintWriter;
 import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Set;
@@ -42,9 +43,11 @@ public class WatchManager implements IWatchManager {
 
 
     private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
     private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
 
 
-    private final Map<String, Set<Watcher>> watchTable = new HashMap<String, Set<Watcher>>();
+    private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
 
 
-    private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>();
+    private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();
+
+    private final WatcherModeManager watcherModeManager = new WatcherModeManager();
 
 
     @Override
     @Override
     public synchronized int size() {
     public synchronized int size() {
@@ -55,12 +58,17 @@ public class WatchManager implements IWatchManager {
         return result;
         return result;
     }
     }
 
 
-    boolean isDeadWatcher(Watcher watcher) {
+    private boolean isDeadWatcher(Watcher watcher) {
         return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
         return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
     }
     }
 
 
     @Override
     @Override
-    public synchronized boolean addWatch(String path, Watcher watcher) {
+    public boolean addWatch(String path, Watcher watcher) {
+        return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE);
+    }
+
+    @Override
+    public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
         if (isDeadWatcher(watcher)) {
         if (isDeadWatcher(watcher)) {
             LOG.debug("Ignoring addWatch with closed cnxn");
             LOG.debug("Ignoring addWatch with closed cnxn");
             return false;
             return false;
@@ -71,7 +79,7 @@ public class WatchManager implements IWatchManager {
             // don't waste memory if there are few watches on a node
             // don't waste memory if there are few watches on a node
             // rehash when the 4th entry is added, doubling size thereafter
             // rehash when the 4th entry is added, doubling size thereafter
             // seems like a good compromise
             // seems like a good compromise
-            list = new HashSet<Watcher>(4);
+            list = new HashSet<>(4);
             watchTable.put(path, list);
             watchTable.put(path, list);
         }
         }
         list.add(watcher);
         list.add(watcher);
@@ -79,9 +87,12 @@ public class WatchManager implements IWatchManager {
         Set<String> paths = watch2Paths.get(watcher);
         Set<String> paths = watch2Paths.get(watcher);
         if (paths == null) {
         if (paths == null) {
             // cnxns typically have many watches, so use default cap here
             // cnxns typically have many watches, so use default cap here
-            paths = new HashSet<String>();
+            paths = new HashSet<>();
             watch2Paths.put(watcher, paths);
             watch2Paths.put(watcher, paths);
         }
         }
+
+        watcherModeManager.setWatcherMode(watcher, path, watcherMode);
+
         return paths.add(path);
         return paths.add(path);
     }
     }
 
 
@@ -99,6 +110,7 @@ public class WatchManager implements IWatchManager {
                     watchTable.remove(p);
                     watchTable.remove(p);
                 }
                 }
             }
             }
+            watcherModeManager.removeWatcher(watcher, p);
         }
         }
     }
     }
 
 
@@ -110,22 +122,45 @@ public class WatchManager implements IWatchManager {
     @Override
     @Override
     public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
     public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
         WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
         WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
-        Set<Watcher> watchers;
+        Set<Watcher> watchers = new HashSet<>();
+        PathParentIterator pathParentIterator = getPathParentIterator(path);
         synchronized (this) {
         synchronized (this) {
-            watchers = watchTable.remove(path);
-            if (watchers == null || watchers.isEmpty()) {
-                if (LOG.isTraceEnabled()) {
-                    ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
+            for (String localPath : pathParentIterator.asIterable()) {
+                Set<Watcher> thisWatchers = watchTable.get(localPath);
+                if (thisWatchers == null || thisWatchers.isEmpty()) {
+                    continue;
                 }
                 }
-                return null;
-            }
-            for (Watcher w : watchers) {
-                Set<String> paths = watch2Paths.get(w);
-                if (paths != null) {
-                    paths.remove(path);
+                Iterator<Watcher> iterator = thisWatchers.iterator();
+                while (iterator.hasNext()) {
+                    Watcher watcher = iterator.next();
+                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
+                    if (watcherMode.isRecursive()) {
+                        if (type != EventType.NodeChildrenChanged) {
+                            watchers.add(watcher);
+                        }
+                    } else if (!pathParentIterator.atParentPath()) {
+                        watchers.add(watcher);
+                        if (!watcherMode.isPersistent()) {
+                            iterator.remove();
+                            Set<String> paths = watch2Paths.get(watcher);
+                            if (paths != null) {
+                                paths.remove(localPath);
+                            }
+                        }
+                    }
+                }
+                if (thisWatchers.isEmpty()) {
+                    watchTable.remove(localPath);
                 }
                 }
             }
             }
         }
         }
+        if (watchers.isEmpty()) {
+            if (LOG.isTraceEnabled()) {
+                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
+            }
+            return null;
+        }
+
         for (Watcher w : watchers) {
         for (Watcher w : watchers) {
             if (supress != null && supress.contains(w)) {
             if (supress != null && supress.contains(w)) {
                 continue;
                 continue;
@@ -134,24 +169,24 @@ public class WatchManager implements IWatchManager {
         }
         }
 
 
         switch (type) {
         switch (type) {
-        case NodeCreated:
-            ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
-            break;
-
-        case NodeDeleted:
-            ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
-            break;
-
-        case NodeDataChanged:
-            ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
-            break;
-
-        case NodeChildrenChanged:
-            ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
-            break;
-        default:
-            // Other types not logged.
-            break;
+            case NodeCreated:
+                ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
+                break;
+
+            case NodeDeleted:
+                ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
+                break;
+
+            case NodeDataChanged:
+                ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
+                break;
+
+            case NodeChildrenChanged:
+                ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
+                break;
+            default:
+                // Other types not logged.
+                break;
         }
         }
 
 
         return new WatcherOrBitSet(watchers);
         return new WatcherOrBitSet(watchers);
@@ -197,8 +232,20 @@ public class WatchManager implements IWatchManager {
 
 
     @Override
     @Override
     public synchronized boolean containsWatcher(String path, Watcher watcher) {
     public synchronized boolean containsWatcher(String path, Watcher watcher) {
-        Set<String> paths = watch2Paths.get(watcher);
-        return paths != null && paths.contains(path);
+        WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, path);
+        PathParentIterator pathParentIterator = getPathParentIterator(path);
+        for (String localPath : pathParentIterator.asIterable()) {
+            Set<Watcher> watchers = watchTable.get(localPath);
+            if (!pathParentIterator.atParentPath()) {
+                if (watchers != null) {
+                    return true;    // at the leaf node, all watcher types match
+                }
+            }
+            if (watcherMode.isRecursive()) {
+                return true;
+            }
+        }
+        return false;
     }
     }
 
 
     @Override
     @Override
@@ -217,15 +264,17 @@ public class WatchManager implements IWatchManager {
             watchTable.remove(path);
             watchTable.remove(path);
         }
         }
 
 
+        watcherModeManager.removeWatcher(watcher, path);
+
         return true;
         return true;
     }
     }
 
 
     @Override
     @Override
     public synchronized WatchesReport getWatches() {
     public synchronized WatchesReport getWatches() {
-        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
+        Map<Long, Set<String>> id2paths = new HashMap<>();
         for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
         for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
             Long id = ((ServerCnxn) e.getKey()).getSessionId();
             Long id = ((ServerCnxn) e.getKey()).getSessionId();
-            Set<String> paths = new HashSet<String>(e.getValue());
+            Set<String> paths = new HashSet<>(e.getValue());
             id2paths.put(id, paths);
             id2paths.put(id, paths);
         }
         }
         return new WatchesReport(id2paths);
         return new WatchesReport(id2paths);
@@ -233,9 +282,9 @@ public class WatchManager implements IWatchManager {
 
 
     @Override
     @Override
     public synchronized WatchesPathReport getWatchesByPath() {
     public synchronized WatchesPathReport getWatchesByPath() {
-        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
+        Map<String, Set<Long>> path2ids = new HashMap<>();
         for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
         for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
-            Set<Long> ids = new HashSet<Long>(e.getValue().size());
+            Set<Long> ids = new HashSet<>(e.getValue().size());
             path2ids.put(e.getKey(), ids);
             path2ids.put(e.getKey(), ids);
             for (Watcher watcher : e.getValue()) {
             for (Watcher watcher : e.getValue()) {
                 ids.add(((ServerCnxn) watcher).getSessionId());
                 ids.add(((ServerCnxn) watcher).getSessionId());
@@ -256,4 +305,15 @@ public class WatchManager implements IWatchManager {
     @Override
     @Override
     public void shutdown() { /* do nothing */ }
     public void shutdown() { /* do nothing */ }
 
 
+    @Override
+    public int getRecursiveWatchQty() {
+        return watcherModeManager.getRecursiveQty();
+    }
+
+    private PathParentIterator getPathParentIterator(String path) {
+        if (watcherModeManager.getRecursiveQty() == 0) {
+            return PathParentIterator.forPathOnly(path);
+        }
+        return PathParentIterator.forAll(path);
+    }
 }
 }

+ 56 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherMode.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.apache.zookeeper.ZooDefs;
+
+public enum WatcherMode {
+    STANDARD(false, false),
+    PERSISTENT(true, false),
+    PERSISTENT_RECURSIVE(true, true)
+    ;
+
+    public static final WatcherMode DEFAULT_WATCHER_MODE = WatcherMode.STANDARD;
+
+    public static WatcherMode fromZooDef(int mode) {
+        switch (mode) {
+            case ZooDefs.AddWatchModes.persistent:
+                return PERSISTENT;
+            case ZooDefs.AddWatchModes.persistentRecursive:
+                return PERSISTENT_RECURSIVE;
+        }
+        throw new IllegalArgumentException("Unsupported mode: " + mode);
+    }
+
+    private final boolean isPersistent;
+    private final boolean isRecursive;
+
+    WatcherMode(boolean isPersistent, boolean isRecursive) {
+        this.isPersistent = isPersistent;
+        this.isRecursive = isRecursive;
+    }
+
+    public boolean isPersistent() {
+        return isPersistent;
+    }
+
+    public boolean isRecursive() {
+        return isRecursive;
+    }
+}

+ 96 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherModeManager.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.zookeeper.Watcher;
+
+class WatcherModeManager {
+    private final Map<Key, WatcherMode> watcherModes = new ConcurrentHashMap<>();
+    private final AtomicInteger recursiveQty = new AtomicInteger(0);
+
+    private static class Key {
+        private final Watcher watcher;
+        private final String path;
+
+        Key(Watcher watcher, String path) {
+            this.watcher = watcher;
+            this.path = path;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Key key = (Key) o;
+            return watcher.equals(key.watcher) && path.equals(key.path);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(watcher, path);
+        }
+    }
+
+    // VisibleForTesting
+    Map<Key, WatcherMode> getWatcherModes() {
+        return watcherModes;
+    }
+
+    void setWatcherMode(Watcher watcher, String path, WatcherMode mode) {
+        if (mode == WatcherMode.DEFAULT_WATCHER_MODE) {
+            removeWatcher(watcher, path);
+        } else {
+            adjustRecursiveQty(watcherModes.put(new Key(watcher, path), mode), mode);
+        }
+    }
+
+    WatcherMode getWatcherMode(Watcher watcher, String path) {
+        return watcherModes.getOrDefault(new Key(watcher, path), WatcherMode.DEFAULT_WATCHER_MODE);
+    }
+
+    void removeWatcher(Watcher watcher, String path) {
+        adjustRecursiveQty(watcherModes.remove(new Key(watcher, path)), WatcherMode.DEFAULT_WATCHER_MODE);
+    }
+
+    int getRecursiveQty() {
+        return recursiveQty.get();
+    }
+
+    // recursiveQty is an optimization to avoid having to walk the map every time this value is needed
+    private void adjustRecursiveQty(WatcherMode oldMode, WatcherMode newMode) {
+        if (oldMode == null) {
+            oldMode = WatcherMode.DEFAULT_WATCHER_MODE;
+        }
+        if (oldMode.isRecursive() != newMode.isRecursive()) {
+            if (newMode.isRecursive()) {
+                recursiveQty.incrementAndGet();
+            } else {
+                recursiveQty.decrementAndGet();
+            }
+        }
+    }
+}

+ 84 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/PathParentIteratorTest.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PathParentIteratorTest {
+    @Test
+    public void testRoot() {
+        PathParentIterator pathParentIterator = PathParentIterator.forAll("/");
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertFalse(pathParentIterator.atParentPath());
+        Assert.assertEquals(pathParentIterator.next(), "/");
+        Assert.assertFalse(pathParentIterator.hasNext());
+    }
+
+    @Test
+    public void test1Level() {
+        PathParentIterator pathParentIterator = PathParentIterator.forAll("/a");
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertFalse(pathParentIterator.atParentPath());
+        Assert.assertEquals(pathParentIterator.next(), "/a");
+
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertEquals(pathParentIterator.next(), "/");
+        Assert.assertTrue(pathParentIterator.atParentPath());
+
+        Assert.assertFalse(pathParentIterator.hasNext());
+    }
+
+    @Test
+    public void testLong() {
+        PathParentIterator pathParentIterator = PathParentIterator.forAll("/a/b/c/d");
+
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertEquals(pathParentIterator.next(), "/a/b/c/d");
+        Assert.assertFalse(pathParentIterator.atParentPath());
+
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertEquals(pathParentIterator.next(), "/a/b/c");
+        Assert.assertTrue(pathParentIterator.atParentPath());
+
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertEquals(pathParentIterator.next(), "/a/b");
+        Assert.assertTrue(pathParentIterator.atParentPath());
+
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertEquals(pathParentIterator.next(), "/a");
+        Assert.assertTrue(pathParentIterator.atParentPath());
+
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertEquals(pathParentIterator.next(), "/");
+        Assert.assertTrue(pathParentIterator.atParentPath());
+
+        Assert.assertFalse(pathParentIterator.hasNext());
+    }
+
+    @Test
+    public void testForPathOnly() {
+        PathParentIterator pathParentIterator = PathParentIterator.forPathOnly("/a/b/c/d");
+        Assert.assertTrue(pathParentIterator.hasNext());
+        Assert.assertEquals(pathParentIterator.next(), "/a/b/c/d");
+        Assert.assertFalse(pathParentIterator.atParentPath());
+
+        Assert.assertFalse(pathParentIterator.hasNext());
+    }
+}

+ 197 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/RecursiveWatchQtyTest.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RecursiveWatchQtyTest {
+    private WatchManager watchManager;
+
+    private static final int clientQty = 25;
+    private static final int iterations = 1000;
+
+    private static class DummyWatcher implements Watcher {
+        @Override
+        public void process(WatchedEvent event) {
+            // NOP
+        }
+    }
+
+    @Before
+    public void setup() {
+        watchManager = new WatchManager();
+    }
+
+    @Test
+    public void testRecursiveQty() {
+        WatcherModeManager manager = new WatcherModeManager();
+        DummyWatcher watcher = new DummyWatcher();
+        manager.setWatcherMode(watcher, "/a", WatcherMode.DEFAULT_WATCHER_MODE);
+        assertEquals(0, manager.getRecursiveQty());
+        manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(1, manager.getRecursiveQty());
+        manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(2, manager.getRecursiveQty());
+        manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(2, manager.getRecursiveQty());
+        manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT);
+        assertEquals(1, manager.getRecursiveQty());
+        manager.setWatcherMode(watcher, "/a/b", WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(2, manager.getRecursiveQty());
+        manager.setWatcherMode(watcher, "/a/b", WatcherMode.DEFAULT_WATCHER_MODE);
+        assertEquals(1, manager.getRecursiveQty());
+        manager.setWatcherMode(watcher, "/a", WatcherMode.PERSISTENT);
+        assertEquals(0, manager.getRecursiveQty());
+    }
+
+    @Test
+    public void testAddRemove() {
+        Watcher watcher1 = new DummyWatcher();
+        Watcher watcher2 = new DummyWatcher();
+
+        watchManager.addWatch("/a", watcher1, WatcherMode.PERSISTENT_RECURSIVE);
+        watchManager.addWatch("/b", watcher2, WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(2, watchManager.getRecursiveWatchQty());
+        assertTrue(watchManager.removeWatcher("/a", watcher1));
+        assertTrue(watchManager.removeWatcher("/b", watcher2));
+        assertEquals(0, watchManager.getRecursiveWatchQty());
+    }
+
+    @Test
+    public void testAddRemoveAlt() {
+        Watcher watcher1 = new DummyWatcher();
+        Watcher watcher2 = new DummyWatcher();
+
+        watchManager.addWatch("/a", watcher1, WatcherMode.PERSISTENT_RECURSIVE);
+        watchManager.addWatch("/b", watcher2, WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(2, watchManager.getRecursiveWatchQty());
+        watchManager.removeWatcher(watcher1);
+        watchManager.removeWatcher(watcher2);
+        assertEquals(0, watchManager.getRecursiveWatchQty());
+    }
+
+    @Test
+    public void testDoubleAdd() {
+        Watcher watcher = new DummyWatcher();
+
+        watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+        watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(1, watchManager.getRecursiveWatchQty());
+        watchManager.removeWatcher(watcher);
+        assertEquals(0, watchManager.getRecursiveWatchQty());
+    }
+
+    @Test
+    public void testSameWatcherMultiPath() {
+        Watcher watcher = new DummyWatcher();
+
+        watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+        watchManager.addWatch("/a/b", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+        watchManager.addWatch("/a/b/c", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(3, watchManager.getRecursiveWatchQty());
+        assertTrue(watchManager.removeWatcher("/a/b", watcher));
+        assertEquals(2, watchManager.getRecursiveWatchQty());
+        watchManager.removeWatcher(watcher);
+        assertEquals(0, watchManager.getRecursiveWatchQty());
+    }
+
+    @Test
+    public void testChangeType() {
+        Watcher watcher = new DummyWatcher();
+
+        watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT);
+        assertEquals(0, watchManager.getRecursiveWatchQty());
+        watchManager.addWatch("/a", watcher, WatcherMode.PERSISTENT_RECURSIVE);
+        assertEquals(1, watchManager.getRecursiveWatchQty());
+        watchManager.addWatch("/a", watcher, WatcherMode.STANDARD);
+        assertEquals(0, watchManager.getRecursiveWatchQty());
+        assertTrue(watchManager.removeWatcher("/a", watcher));
+        assertEquals(0, watchManager.getRecursiveWatchQty());
+    }
+
+    @Test
+    public void testRecursiveQtyConcurrency() {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        WatcherModeManager manager = new WatcherModeManager();
+        ExecutorService threadPool = Executors.newFixedThreadPool(clientQty);
+        List<Future<?>> tasks = null;
+        CountDownLatch completedLatch = new CountDownLatch(clientQty);
+        try {
+            tasks = IntStream.range(0, clientQty)
+                    .mapToObj(__ -> threadPool.submit(() -> iterate(manager, completedLatch)))
+                    .collect(Collectors.toList());
+            try {
+                completedLatch.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        } finally {
+            if (tasks != null) {
+                tasks.forEach(t -> t.cancel(true));
+            }
+            threadPool.shutdownNow();
+        }
+
+        int expectedRecursiveQty = (int) manager.getWatcherModes().values()
+                .stream()
+                .filter(mode -> mode == WatcherMode.PERSISTENT_RECURSIVE)
+                .count();
+        assertEquals(expectedRecursiveQty, manager.getRecursiveQty());
+    }
+
+    private void iterate(WatcherModeManager manager, CountDownLatch completedLatch) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        try {
+            for (int i = 0; i < iterations; ++i) {
+                String path = "/" + random.nextInt(clientQty);
+                boolean doSet = random.nextInt(100) > 33;    // 2/3 will be sets
+                if (doSet) {
+                    WatcherMode mode = WatcherMode.values()[random.nextInt(WatcherMode.values().length)];
+                    manager.setWatcherMode(new DummyWatcher(), path, mode);
+                } else {
+                    manager.removeWatcher(new DummyWatcher(), path);
+                }
+
+                int sleepMillis = random.nextInt(2);
+                if (sleepMillis > 0) {
+                    try {
+                        Thread.sleep(sleepMillis);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        } finally {
+            completedLatch.countDown();
+        }
+    }
+}

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java

@@ -49,7 +49,7 @@ public class WatchManagerTest extends ZKTestCase {
 
 
     protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class);
     protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class);
 
 
-    private static final String PATH_PREFIX = "path";
+    private static final String PATH_PREFIX = "/path";
 
 
     private ConcurrentHashMap<Integer, DumbWatcher> watchers;
     private ConcurrentHashMap<Integer, DumbWatcher> watchers;
     private Random r;
     private Random r;

+ 174 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java

@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PersistentRecursiveWatcherTest extends ClientBase {
+    private static final Logger LOG = LoggerFactory.getLogger(PersistentRecursiveWatcherTest.class);
+    private BlockingQueue<WatchedEvent> events;
+    private Watcher persistentWatcher;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        events = new LinkedBlockingQueue<>();
+        persistentWatcher = event -> events.add(event);
+    }
+
+    @Test
+    public void testBasic()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+            internalTestBasic(zk);
+        }
+    }
+
+    @Test
+    public void testBasicAsync()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+            AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
+                if (rc == 0) {
+                    latch.countDown();
+                }
+            };
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE, cb, null);
+            Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+            internalTestBasic(zk);
+        }
+    }
+
+    private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
+        zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.setData("/a/b/c/d/e", new byte[0], -1);
+        zk.delete("/a/b/c/d/e", -1);
+        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d");
+        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+        assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e");
+        assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e");
+        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+    }
+
+    @Test
+    public void testRemoval()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+
+            zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
+            zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+        }
+    }
+
+    @Test
+    public void testDisconnect() throws Exception {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+            stopServer();
+            assertEvent(events, Watcher.Event.EventType.None, null);
+            startServer();
+            assertEvent(events, Watcher.Event.EventType.None, null);
+            internalTestBasic(zk);
+        }
+    }
+
+    @Test
+    public void testMultiClient()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk1 = createClient(new CountdownWatcher(), hostPort); ZooKeeper zk2 = createClient(new CountdownWatcher(), hostPort)) {
+
+            zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
+            zk1.setData("/a/b/c", "one".getBytes(), -1);
+            Thread.sleep(1000); // give some time for the event to arrive
+
+            zk2.setData("/a/b/c", "two".getBytes(), -1);
+            zk2.setData("/a/b/c", "three".getBytes(), -1);
+            zk2.setData("/a/b/c", "four".getBytes(), -1);
+
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+        }
+    }
+
+    @Test
+    public void testRootWatcher()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE);
+            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a");
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b");
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c");
+        }
+    }
+
+    private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
+            throws InterruptedException {
+        WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(eventType, event.getType());
+        Assert.assertEquals(path, event.getPath());
+    }
+}

+ 211 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentWatcherTest.java

@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.AddWatchMode.PERSISTENT;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PersistentWatcherTest extends ClientBase {
+    private static final Logger LOG = LoggerFactory.getLogger(PersistentWatcherTest.class);
+    private BlockingQueue<WatchedEvent> events;
+    private Watcher persistentWatcher;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        events = new LinkedBlockingQueue<>();
+        persistentWatcher = event -> events.add(event);
+    }
+
+    @Test
+    public void testBasic()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
+            internalTestBasic(zk);
+        }
+    }
+
+    @Test
+    public void testDefaultWatcher()
+            throws IOException, InterruptedException, KeeperException {
+        CountdownWatcher watcher = new CountdownWatcher() {
+            @Override
+            public synchronized void process(WatchedEvent event) {
+                super.process(event);
+                events.add(event);
+            }
+        };
+        try (ZooKeeper zk = createClient(watcher, hostPort)) {
+            zk.addWatch("/a/b", PERSISTENT);
+            events.clear(); // clear any events added during client connection
+            internalTestBasic(zk);
+        }
+    }
+
+    @Test
+    public void testBasicAsync()
+            throws IOException, InterruptedException, KeeperException {
+        CountdownWatcher watcher = new CountdownWatcher() {
+            @Override
+            public synchronized void process(WatchedEvent event) {
+                super.process(event);
+                events.add(event);
+            }
+        };
+        try (ZooKeeper zk = createClient(watcher, hostPort)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+            AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
+                if (rc == 0) {
+                    latch.countDown();
+                }
+            };
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT, cb, null);
+            Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+            events.clear(); // clear any events added during client connection
+            internalTestBasic(zk);
+        }
+    }
+
+    @Test
+    public void testAsyncDefaultWatcher()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            final CountDownLatch latch = new CountDownLatch(1);
+            AsyncCallback.VoidCallback cb = (rc, path, ctx) -> {
+                if (rc == 0) {
+                    latch.countDown();
+                }
+            };
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT, cb, null);
+            Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+            internalTestBasic(zk);
+        }
+    }
+
+    private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
+        zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.setData("/a/b", new byte[0], -1);
+        zk.delete("/a/b/c", -1);
+        zk.delete("/a/b", -1);
+        zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+        assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
+        assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+        assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
+        assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b");
+        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+    }
+
+    @Test
+    public void testRemoval()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
+            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+            assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
+
+            zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
+            zk.delete("/a/b/c", -1);
+            zk.delete("/a/b", -1);
+            assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+        }
+    }
+
+    @Test
+    public void testDisconnect() throws Exception {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/a/b", persistentWatcher, PERSISTENT);
+            stopServer();
+            assertEvent(events, Watcher.Event.EventType.None, null);
+            startServer();
+            assertEvent(events, Watcher.Event.EventType.None, null);
+            internalTestBasic(zk);
+        }
+    }
+
+    @Test
+    public void testMultiClient()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk1 = createClient(new CountdownWatcher(), hostPort);
+             ZooKeeper zk2 = createClient(new CountdownWatcher(), hostPort)) {
+
+            zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            zk1.addWatch("/a/b", persistentWatcher, PERSISTENT);
+            zk1.setData("/a/b", "one".getBytes(), -1);
+            Thread.sleep(1000); // give some time for the event to arrive
+
+            zk2.setData("/a/b", "two".getBytes(), -1);
+            zk2.setData("/a/b", "three".getBytes(), -1);
+            zk2.setData("/a/b", "four".getBytes(), -1);
+
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+        }
+    }
+
+    @Test
+    public void testRootWatcher()
+            throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
+            zk.addWatch("/", persistentWatcher, PERSISTENT);
+            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.setData("/a", new byte[0], -1);
+            zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/");
+            assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/");
+        }
+    }
+
+    private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
+            throws InterruptedException {
+        WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+        Assert.assertNotNull(event);
+        Assert.assertEquals(eventType, event.getType());
+        Assert.assertEquals(path, event.getPath());
+    }
+}

+ 124 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java

@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collections;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.watch.WatchManagerFactory;
+import org.apache.zookeeper.server.watch.WatcherOrBitSet;
+import org.apache.zookeeper.server.watch.WatchesPathReport;
+import org.apache.zookeeper.server.watch.WatchesReport;
+import org.apache.zookeeper.server.watch.WatchesSummary;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnsupportedAddWatcherTest extends ClientBase {
+
+    public static class StubbedWatchManager implements IWatchManager {
+        @Override
+        public boolean addWatch(String path, Watcher watcher) {
+            return false;
+        }
+
+        @Override
+        public boolean containsWatcher(String path, Watcher watcher) {
+            return false;
+        }
+
+        @Override
+        public boolean removeWatcher(String path, Watcher watcher) {
+            return false;
+        }
+
+        @Override
+        public void removeWatcher(Watcher watcher) {
+            // NOP
+        }
+
+        @Override
+        public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) {
+            return new WatcherOrBitSet(Collections.emptySet());
+        }
+
+        @Override
+        public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) {
+            return new WatcherOrBitSet(Collections.emptySet());
+        }
+
+        @Override
+        public int size() {
+            return 0;
+        }
+
+        @Override
+        public void shutdown() {
+            // NOP
+        }
+
+        @Override
+        public WatchesSummary getWatchesSummary() {
+            return null;
+        }
+
+        @Override
+        public WatchesReport getWatches() {
+            return null;
+        }
+
+        @Override
+        public WatchesPathReport getWatchesByPath() {
+            return null;
+        }
+
+        @Override
+        public void dumpWatches(PrintWriter pwriter, boolean byPath) {
+            // NOP
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME, StubbedWatchManager.class.getName());
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            super.tearDown();
+        } finally {
+            System.clearProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME);
+        }
+    }
+
+    @Test(expected = KeeperException.MarshallingErrorException.class)
+    public void testBehavior() throws IOException, InterruptedException, KeeperException {
+        try (ZooKeeper zk = createClient(hostPort)) {
+            // the server will generate an exception as our custom watch manager doesn't implement
+            // the new version of addWatch()
+            zk.addWatch("/foo", event -> {}, AddWatchMode.PERSISTENT_RECURSIVE);
+        }
+    }
+}