浏览代码

ZOOKEEPER-1910. RemoveWatches wrongly removes the watcher if multiple watches
exists on a path (Rakesh R via camille)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1590850 13f79535-47bb-0310-9956-ffa450edef68

Camille Fournier 11 年之前
父节点
当前提交
a1cd9204d2

+ 3 - 0
CHANGES.txt

@@ -627,6 +627,9 @@ BUGFIXES:
   scheme (Craig Condit via michim)
 
   ZOOKEEPER-1843. Oddity in ByteBufferInputStream skip (Bill Havanki via michim)
+  
+  ZOOKEEPER-1910. RemoveWatches wrongly removes the watcher if multiple watches 
+  exists on a path (Rakesh R via camille)
 
 IMPROVEMENTS:
 

+ 3 - 1
src/java/main/org/apache/zookeeper/ZooDefs.java

@@ -61,7 +61,9 @@ public class ZooDefs {
 
         public final int reconfig = 16;
 
-        public final int removeWatches = 17;
+        public final int checkWatches = 17;
+
+        public final int removeWatches = 18;
 
         public final int auth = 100;
 

+ 118 - 41
src/java/main/org/apache/zookeeper/ZooKeeper.java

@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.jute.Record;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
@@ -50,6 +51,7 @@ import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.common.StringUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CheckWatchesRequest;
 import org.apache.zookeeper.proto.Create2Request;
 import org.apache.zookeeper.proto.Create2Response;
 import org.apache.zookeeper.proto.CreateRequest;
@@ -2327,21 +2329,16 @@ public class ZooKeeper {
     /**
      * For the given znode path, removes the specified watcher of given
      * watcherType.
-     * 
-     * <p>
-     * If watcher is null, all watchers for the given path and watcherType will
-     * be removed. Otherwise only the specified watcher corresponding to the
-     * passed path and watcherType will be removed.
-     * <p>
-     * A successful call guarantees that, the removed watcher won't be
-     * triggered.
+     *
      * <p>
-     * 
+     * Watcher shouldn't be null. A successful call guarantees that, the
+     * removed watcher won't be triggered.
+     * </p>
+     *
      * @param path
      *            - the path of the node
      * @param watcher
-     *            - a concrete watcher or null to remove all watchers for the
-     *            given path and watcherType
+     *            - a concrete watcher
      * @param watcherType
      *            - the type of watcher to be removed
      * @param local
@@ -2354,26 +2351,100 @@ public class ZooKeeper {
      * @throws KeeperException
      *             if the server signals an error with a non-zero error code.
      * @throws IllegalArgumentException
-     *             if an invalid path is specified
-     * 
+     *             if any of the following is true:
+     *             <ul>
+     *             <li> {@code path} is invalid
+     *             <li> {@code watcher} is null
+     *             </ul>
+     *
      * @since 3.5.0
      */
     public void removeWatches(String path, Watcher watcher,
             WatcherType watcherType, boolean local)
             throws InterruptedException, KeeperException {
+        validateWatcher(watcher);
+        removeWatches(ZooDefs.OpCode.checkWatches, path, watcher,
+                watcherType, local);
+    }
+
+    /**
+     * The asynchronous version of removeWatches.
+     *
+     * @see #removeWatches
+     */
+    public void removeWatches(String path, Watcher watcher,
+            WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) {
+        validateWatcher(watcher);
+        removeWatches(ZooDefs.OpCode.checkWatches, path, watcher,
+                watcherType, local, cb, ctx);
+    }
+
+    /**
+     * For the given znode path, removes all the registered watchers of given
+     * watcherType.
+     *
+     * <p>
+     * A successful call guarantees that, the removed watchers won't be
+     * triggered.
+     * </p>
+     *
+     * @param path
+     *            - the path of the node
+     * @param watcherType
+     *            - the type of watcher to be removed
+     * @param local
+     *            - whether watches can be removed locally when there is no
+     *            server connection
+     * @throws InterruptedException
+     *             if the server transaction is interrupted.
+     * @throws KeeperException.NoWatcher
+     *             if no watcher exists that match the specified parameters
+     * @throws KeeperException
+     *             if the server signals an error with a non-zero error code.
+     * @throws IllegalArgumentException
+     *             if an invalid {@code path} is specified
+     *
+     * @since 3.5.0
+     */
+    public void removeAllWatches(String path, WatcherType watcherType,
+            boolean local) throws InterruptedException, KeeperException {
+
+        removeWatches(ZooDefs.OpCode.removeWatches, path, null, watcherType,
+                local);
+    }
+
+    /**
+     * The asynchronous version of removeAllWatches.
+     *
+     * @see #removeAllWatches
+     */
+    public void removeAllWatches(String path, WatcherType watcherType,
+            boolean local, VoidCallback cb, Object ctx) {
+
+        removeWatches(ZooDefs.OpCode.removeWatches, path, null,
+                watcherType, local, cb, ctx);
+    }
+
+    private void validateWatcher(Watcher watcher) {
+        if (watcher == null) {
+            throw new IllegalArgumentException(
+                    "Invalid Watcher, shouldn't be null!");
+        }
+    }
+
+    private void removeWatches(int opCode, String path, Watcher watcher,
+            WatcherType watcherType, boolean local)
+            throws InterruptedException, KeeperException {
         PathUtils.validatePath(path);
         final String clientPath = path;
-        // Validating the existence of watcher.
-        watchManager.containsWatcher(clientPath, watcher, watcherType);
         final String serverPath = prependChroot(clientPath);
-
         WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher,
                 watcherType, local, watchManager);
+
         RequestHeader h = new RequestHeader();
-        h.setType(ZooDefs.OpCode.removeWatches);
-        RemoveWatchesRequest request = new RemoveWatchesRequest();
-        request.setPath(serverPath);
-        request.setType(watcherType.getIntValue());
+        h.setType(opCode);
+        Record request = getRemoveWatchesRequest(opCode, watcherType,
+                serverPath);
 
         ReplyHeader r = cnxn.submitRequest(h, request, null, null, wcb);
         if (r.getErr() != 0) {
@@ -2382,37 +2453,43 @@ public class ZooKeeper {
         }
     }
 
-    /**
-     * The asynchronous version of removeWatches.
-     * 
-     * @see #removeWatches
-     */
-    public void removeWatches(String path, Watcher watcher,
+    private void removeWatches(int opCode, String path, Watcher watcher,
             WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) {
+        PathUtils.validatePath(path);
         final String clientPath = path;
-        PathUtils.validatePath(clientPath);
-
-        // Validating the existence of watcher.
-        try {
-            watchManager.containsWatcher(clientPath, watcher, watcherType);
-        } catch (NoWatcherException nwe) {
-            LOG.error("Failed to find watcher!", nwe);
-            cb.processResult(nwe.code().intValue(), clientPath, ctx);
-            return;
-        }
-
         final String serverPath = prependChroot(clientPath);
         WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher,
                 watcherType, local, watchManager);
+
         RequestHeader h = new RequestHeader();
-        h.setType(ZooDefs.OpCode.removeWatches);
-        RemoveWatchesRequest request = new RemoveWatchesRequest();
-        request.setPath(serverPath);
-        request.setType(watcherType.getIntValue());
+        h.setType(opCode);
+        Record request = getRemoveWatchesRequest(opCode, watcherType,
+                serverPath);
+
         cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
                 serverPath, ctx, null, wcb);
     }
 
+    private Record getRemoveWatchesRequest(int opCode, WatcherType watcherType,
+            final String serverPath) {
+        Record request = null;
+        switch (opCode) {
+        case ZooDefs.OpCode.checkWatches:
+            CheckWatchesRequest chkReq = new CheckWatchesRequest();
+            chkReq.setPath(serverPath);
+            chkReq.setType(watcherType.getIntValue());
+            request = chkReq;
+            break;
+        case ZooDefs.OpCode.removeWatches:
+            RemoveWatchesRequest rmReq = new RemoveWatchesRequest();
+            rmReq.setPath(serverPath);
+            rmReq.setType(watcherType.getIntValue());
+            request = rmReq;
+            break;
+        }
+        return request;
+    }
+
     public States getState() {
         return cnxn.getState();
     }

+ 1 - 1
src/java/main/org/apache/zookeeper/cli/RemoveWatchesCommand.java

@@ -74,7 +74,7 @@ public class RemoveWatchesCommand extends CliCommand {
         boolean local = cl.hasOption("l");
 
         try {
-            zk.removeWatches(path, null, wtype, local);
+            zk.removeAllWatches(path, wtype, local);
         } catch (KeeperException.NoWatcherException ex) {
             err.println(ex.getMessage());
             return false;

+ 21 - 2
src/java/main/org/apache/zookeeper/server/DataTree.java

@@ -1365,9 +1365,29 @@ public class DataTree {
         }
     }
 
+    public boolean containsWatcher(String path, WatcherType type, Watcher watcher) {
+        boolean containsWatcher = false;
+        switch (type) {
+        case Children:
+            containsWatcher = this.childWatches.containsWatcher(path, watcher);
+            break;
+        case Data:
+            containsWatcher = this.dataWatches.containsWatcher(path, watcher);
+            break;
+        case Any:
+            if (this.childWatches.containsWatcher(path, watcher)) {
+                containsWatcher = true;
+            }
+            if (this.dataWatches.containsWatcher(path, watcher)) {
+                containsWatcher = true;
+            }
+            break;
+        }
+        return containsWatcher;
+    }
+
     public boolean removeWatch(String path, WatcherType type, Watcher watcher) {
         boolean removed = false;
-
         switch (type) {
         case Children:
             removed = this.childWatches.removeWatcher(path, watcher);
@@ -1384,7 +1404,6 @@ public class DataTree {
             }
             break;
         }
-
         return removed;
     }
 }

+ 16 - 2
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -35,6 +35,7 @@ import org.apache.zookeeper.KeeperException.SessionMovedException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CheckWatchesRequest;
 import org.apache.zookeeper.proto.Create2Response;
 import org.apache.zookeeper.proto.CreateResponse;
 import org.apache.zookeeper.proto.ExistsRequest;
@@ -394,6 +395,21 @@ public class FinalRequestProcessor implements RequestProcessor {
                 rsp = new GetChildren2Response(children, stat);
                 break;
             }
+            case OpCode.checkWatches: {
+                lastOp = "CHKW";
+                CheckWatchesRequest checkWatches = new CheckWatchesRequest();
+                ByteBufferInputStream.byteBuffer2Record(request.request,
+                        checkWatches);
+                WatcherType type = WatcherType.fromInt(checkWatches.getType());
+                boolean containsWatcher = zks.getZKDatabase().containsWatcher(
+                        checkWatches.getPath(), type, cnxn);
+                if (!containsWatcher) {
+                    String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
+                            new Object[] { checkWatches.getPath(), type });
+                    throw new KeeperException.NoWatcherException(msg);
+                }
+                break;
+            }
             case OpCode.removeWatches: {
                 lastOp = "REMW";
                 RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
@@ -402,13 +418,11 @@ public class FinalRequestProcessor implements RequestProcessor {
                 WatcherType type = WatcherType.fromInt(removeWatches.getType());
                 boolean removed = zks.getZKDatabase().removeWatch(
                         removeWatches.getPath(), type, cnxn);
-
                 if (!removed) {
                     String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
                             new Object[] { removeWatches.getPath(), type });
                     throw new KeeperException.NoWatcherException(msg);
                 }
-
                 break;
             }
             }

+ 1 - 0
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -838,6 +838,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
             case OpCode.getChildren2:
             case OpCode.ping:
             case OpCode.setWatches:
+            case OpCode.checkWatches:
             case OpCode.removeWatches:
                 zks.sessionTracker.checkSession(request.sessionId,
                         request.getOwner());

+ 3 - 0
src/java/main/org/apache/zookeeper/server/Request.java

@@ -150,6 +150,7 @@ public class Request {
         case OpCode.setData:
         case OpCode.setWatches:
         case OpCode.sync:
+        case OpCode.checkWatches:
         case OpCode.removeWatches:
             return true;
         default:
@@ -225,6 +226,8 @@ public class Request {
             return "error";
         case OpCode.reconfig:
            return "reconfig";
+        case OpCode.checkWatches:
+            return "checkWatches";
         case OpCode.removeWatches:
             return "removeWatches";
         default:

+ 26 - 0
src/java/main/org/apache/zookeeper/server/WatchManager.java

@@ -169,6 +169,32 @@ class WatchManager {
         }
     }
 
+    /**
+     * Checks the specified watcher exists for the given path
+     *
+     * @param path
+     *            znode path
+     * @param watcher
+     *            watcher object reference
+     * @return true if the watcher exists, false otherwise
+     */
+    synchronized boolean containsWatcher(String path, Watcher watcher) {
+        HashSet<String> paths = watch2Paths.get(watcher);
+        if (paths == null || !paths.contains(path)) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Removes the specified watcher for the given path
+     *
+     * @param path
+     *            znode path
+     * @param watcher
+     *            watcher object reference
+     * @return true if the watcher successfully removed, false otherwise
+     */
     synchronized boolean removeWatcher(String path, Watcher watcher) {
         HashSet<String> paths = watch2Paths.get(watcher);
         if (paths == null || !paths.remove(path)) {

+ 14 - 0
src/java/main/org/apache/zookeeper/server/ZKDatabase.java

@@ -587,6 +587,20 @@ public class ZKDatabase {
         this.snapshotSizeFactor = snapshotSizeFactor;
     }
 
+    /**
+     * Check whether the given watcher exists in datatree
+     *
+     * @param path
+     *            node to check watcher existence
+     * @param type
+     *            type of watcher
+     * @param watcher
+     *            watcher function
+     */
+    public boolean containsWatcher(String path, WatcherType type, Watcher watcher) {
+        return dataTree.containsWatcher(path, type, watcher);
+    }
+
     /**
      * Remove watch from the datatree
      * 

+ 355 - 11
src/java/test/org/apache/zookeeper/RemoveWatchesTest.java

@@ -101,6 +101,26 @@ public class RemoveWatchesTest extends ClientBase {
         }
     }
 
+    private void removeAllWatches(ZooKeeper zk, String path,
+            WatcherType watcherType, boolean local, KeeperException.Code rc)
+            throws InterruptedException, KeeperException {
+        LOG.info("Sending removeWatches req using zk {} path: {} type: {} ",
+                new Object[] { zk, path, watcherType });
+        if (useAsync) {
+            MyCallback c1 = new MyCallback(rc.intValue(), path);
+            zk.removeAllWatches(path, watcherType, local, c1, null);
+            Assert.assertTrue("Didn't succeeds removeWatch operation",
+                    c1.matches());
+            if (KeeperException.Code.OK.intValue() != c1.rc) {
+                KeeperException ke = KeeperException
+                        .create(KeeperException.Code.get(c1.rc));
+                throw ke;
+            }
+        } else {
+            zk.removeAllWatches(path, watcherType, local);
+        }
+    }
+
     /**
      * Test verifies removal of single watcher when there is server connection
      */
@@ -232,7 +252,8 @@ public class RemoveWatchesTest extends ClientBase {
         LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
                 "/node1" });
         zk2.getChildren("/node1", w2);
-        removeWatches(zk2, "/node1", null, WatcherType.Any, false, Code.OK);
+        removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
+        removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
         zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE,
                 CreateMode.EPHEMERAL);
         Assert.assertTrue("Didn't remove data watcher", w1.matches());
@@ -263,7 +284,8 @@ public class RemoveWatchesTest extends ClientBase {
         LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
                 "/node1" });
         zk2.getChildren("/node1", w2);
-        removeWatches(zk2, "/node1", null, WatcherType.Data, false, Code.OK);
+        removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK);
+        removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK);
         zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE,
                 CreateMode.EPHEMERAL);
         Assert.assertTrue("Didn't remove data watcher", w1.matches());
@@ -316,7 +338,8 @@ public class RemoveWatchesTest extends ClientBase {
         LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
                 "/node1" });
         zk2.getChildren("/node1", w2);
-        removeWatches(zk2, "/node1", null, WatcherType.Children, false, Code.OK);
+        removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK);
+        removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK);
         zk1.setData("/node1", "test".getBytes(), -1);
         Assert.assertTrue("Didn't remove child watcher", w1.matches());
         Assert.assertTrue("Didn't remove child watcher", w2.matches());
@@ -398,13 +421,6 @@ public class RemoveWatchesTest extends ClientBase {
         } catch (KeeperException.NoWatcherException nwe) {
             // expected
         }
-        try {
-            removeWatches(zk2, "/nonexists", null, WatcherType.Data, false,
-                    Code.NOWATCHER);
-            Assert.fail("Should throw exception as given watcher doesn't exists");
-        } catch (KeeperException.NoWatcherException nwe) {
-            // expected
-        }
     }
 
     /**
@@ -726,7 +742,7 @@ public class RemoveWatchesTest extends ClientBase {
         watcher.waitForConnected(CONNECTION_TIMEOUT);
 
         try {
-            zk.removeWatches("/nowatchhere", null, WatcherType.Data, false);
+            zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false);
         } catch (KeeperException nwe) {
             if (nwe.code().intValue() == Code.NOWATCHER.intValue()) {
                 nw = true;
@@ -738,6 +754,334 @@ public class RemoveWatchesTest extends ClientBase {
         Assert.assertTrue("NoWatcherException didn't happen", nw);
     }
 
+    /**
+     * Test verifies given watcher doesn't exists!
+     */
+    @Test(timeout = 90000)
+    public void testRemoveAllNoWatcherException() throws IOException,
+            InterruptedException, KeeperException {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        try {
+            removeAllWatches(zk2, "/node1", WatcherType.Any, false,
+                    Code.NOWATCHER);
+            Assert.fail("Should throw exception as given watcher doesn't exists");
+        } catch (KeeperException.NoWatcherException nwe) {
+            // expected
+        }
+    }
+
+    /**
+     * Test verifies null watcher
+     */
+    @Test(timeout = 30000)
+    public void testNullWatcherReference() throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        try {
+            if (useAsync) {
+                zk1.removeWatches("/node1", null, WatcherType.Data, false,
+                        null, null);
+            } else {
+                zk1.removeWatches("/node1", null, WatcherType.Data, false);
+            }
+            Assert.fail("Must throw IllegalArgumentException as watcher is null!");
+        } catch (IllegalArgumentException iae) {
+            // expected
+        }
+    }
+
+    /**
+     * Test verifies WatcherType.Data - removes only the configured data watcher
+     * function
+     */
+    @Test(timeout = 90000)
+    public void testRemoveWhenMultipleDataWatchesOnAPath() throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        final CountDownLatch dataWatchCount = new CountDownLatch(1);
+        final CountDownLatch rmWatchCount = new CountDownLatch(1);
+        Watcher w1 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.DataWatchRemoved) {
+                    rmWatchCount.countDown();
+                }
+            }
+        };
+        Watcher w2 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.NodeDataChanged) {
+                    dataWatchCount.countDown();
+                }
+            }
+        };
+        // Add multiple data watches
+        LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
+                "/node1" });
+        Assert.assertNotNull("Didn't set data watches",
+                zk2.exists("/node1", w1));
+        LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
+                "/node1" });
+        Assert.assertNotNull("Didn't set data watches",
+                zk2.exists("/node1", w2));
+
+        removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK);
+        Assert.assertTrue("Didn't remove data watcher",
+                rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+
+        zk1.setData("/node1", "test".getBytes(), -1);
+        LOG.info("Waiting for data watchers to be notified");
+        Assert.assertTrue("Didn't get data watch notification!",
+                dataWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * Test verifies WatcherType.Children - removes only the configured child
+     * watcher function
+     */
+    @Test(timeout = 90000)
+    public void testRemoveWhenMultipleChildWatchesOnAPath() throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        final CountDownLatch childWatchCount = new CountDownLatch(1);
+        final CountDownLatch rmWatchCount = new CountDownLatch(1);
+        Watcher w1 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.ChildWatchRemoved) {
+                    rmWatchCount.countDown();
+                }
+            }
+        };
+        Watcher w2 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (event.getType() == EventType.NodeChildrenChanged) {
+                    childWatchCount.countDown();
+                }
+            }
+        };
+        // Add multiple child watches
+        LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
+                "/node1" });
+        Assert.assertEquals("Didn't set child watches", 0,
+                zk2.getChildren("/node1", w1).size());
+        LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
+                "/node1" });
+        Assert.assertEquals("Didn't set child watches", 0,
+                zk2.getChildren("/node1", w2).size());
+
+        removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK);
+        Assert.assertTrue("Didn't remove child watcher",
+                rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+
+        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        LOG.info("Waiting for child watchers to be notified");
+        Assert.assertTrue("Didn't get child watch notification!",
+                childWatchCount
+                        .await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * Test verifies WatcherType.Data - removes only the configured data watcher
+     * function
+     */
+    @Test(timeout = 90000)
+    public void testRemoveAllDataWatchesOnAPath() throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        final CountDownLatch dWatchCount = new CountDownLatch(2);
+        final CountDownLatch rmWatchCount = new CountDownLatch(2);
+        Watcher w1 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                switch (event.getType()) {
+                case DataWatchRemoved:
+                    rmWatchCount.countDown();
+                    break;
+                case NodeDataChanged:
+                    dWatchCount.countDown();
+                    break;
+                default:
+                    break;
+                }
+            }
+        };
+        Watcher w2 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                switch (event.getType()) {
+                case DataWatchRemoved:
+                    rmWatchCount.countDown();
+                    break;
+                case NodeDataChanged:
+                    dWatchCount.countDown();
+                    break;
+                default:
+                    break;
+                }
+            }
+        };
+        // Add multiple data watches
+        LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
+                "/node1" });
+        Assert.assertNotNull("Didn't set data watches",
+                zk2.exists("/node1", w1));
+        LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
+                "/node1" });
+        Assert.assertNotNull("Didn't set data watches",
+                zk2.exists("/node1", w2));
+
+        removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK);
+        Assert.assertTrue("Didn't remove data watcher",
+                rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+
+        zk1.setData("/node1", "test".getBytes(), -1);
+        LOG.info("Waiting for data watchers notification after watch removal");
+        Assert.assertFalse("Received data watch notification!",
+                dWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+        Assert.assertEquals("Received watch notification after removal!", 2,
+                dWatchCount.getCount());
+    }
+
+    /**
+     * Test verifies WatcherType.Children - removes only the configured child
+     * watcher function
+     */
+    @Test(timeout = 90000)
+    public void testRemoveAllChildWatchesOnAPath() throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        final CountDownLatch cWatchCount = new CountDownLatch(2);
+        final CountDownLatch rmWatchCount = new CountDownLatch(2);
+        Watcher w1 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                switch (event.getType()) {
+                case ChildWatchRemoved:
+                    rmWatchCount.countDown();
+                    break;
+                case NodeChildrenChanged:
+                    cWatchCount.countDown();
+                    break;
+                default:
+                    break;
+                }
+            }
+        };
+        Watcher w2 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                switch (event.getType()) {
+                case ChildWatchRemoved:
+                    rmWatchCount.countDown();
+                    break;
+                case NodeChildrenChanged:
+                    cWatchCount.countDown();
+                    break;
+                default:
+                    break;
+                }
+            }
+        };
+        // Add multiple child watches
+        LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
+                "/node1" });
+        Assert.assertEquals("Didn't set child watches", 0,
+                zk2.getChildren("/node1", w1).size());
+        LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
+                "/node1" });
+        Assert.assertEquals("Didn't set child watches", 0,
+                zk2.getChildren("/node1", w2).size());
+
+        removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK);
+        Assert.assertTrue("Didn't remove child watcher",
+                rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+
+        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        LOG.info("Waiting for child watchers to be notified");
+        Assert.assertFalse("Didn't get child watch notification!",
+                cWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+        Assert.assertEquals("Received watch notification after removal!", 2,
+                cWatchCount.getCount());
+    }
+
+    /**
+     * Test verifies WatcherType.Any - removes all the configured child,data
+     * watcher functions
+     */
+    @Test(timeout = 90000)
+    public void testRemoveAllWatchesOnAPath() throws Exception {
+        zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        final CountDownLatch watchCount = new CountDownLatch(2);
+        final CountDownLatch rmWatchCount = new CountDownLatch(4);
+        Watcher w1 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                switch (event.getType()) {
+                case ChildWatchRemoved:
+                case DataWatchRemoved:
+                    rmWatchCount.countDown();
+                    break;
+                case NodeChildrenChanged:
+                case NodeDataChanged:
+                    watchCount.countDown();
+                    break;
+                default:
+                    break;
+                }
+            }
+        };
+        Watcher w2 = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                switch (event.getType()) {
+                case ChildWatchRemoved:
+                case DataWatchRemoved:
+                    rmWatchCount.countDown();
+                    break;
+                case NodeChildrenChanged:
+                case NodeDataChanged:
+                    watchCount.countDown();
+                    break;
+                default:
+                    break;
+                }
+            }
+        };
+        // Add multiple child watches
+        LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
+                "/node1" });
+        Assert.assertEquals("Didn't set child watches", 0,
+                zk2.getChildren("/node1", w1).size());
+        LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
+                "/node1" });
+        Assert.assertEquals("Didn't set child watches", 0,
+                zk2.getChildren("/node1", w2).size());
+
+        // Add multiple data watches
+        LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
+                "/node1" });
+        Assert.assertNotNull("Didn't set data watches",
+                zk2.exists("/node1", w1));
+        LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
+                "/node1" });
+        Assert.assertNotNull("Didn't set data watches",
+                zk2.exists("/node1", w2));
+
+        removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK);
+        Assert.assertTrue("Didn't remove data watcher",
+                rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+
+        zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.setData("/node1", "test".getBytes(), -1);
+
+        LOG.info("Waiting for child/data watchers notification after watch removal");
+        Assert.assertFalse("Received watch notification after removal!",
+                watchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+        Assert.assertEquals("Received watch notification after removal!", 2,
+                watchCount.getCount());
+    }
+
     /* a mocked ZK class that doesn't do client-side verification
      * before/after calling removeWatches */
     private class MyZooKeeper extends ZooKeeper {

+ 4 - 0
src/zookeeper.jute

@@ -213,6 +213,10 @@ module org.apache.zookeeper.proto {
         vector<org.apache.zookeeper.data.ACL> acl;
         org.apache.zookeeper.data.Stat stat;
     }
+    class CheckWatchesRequest {
+        ustring path;
+        int type;
+    }
     class RemoveWatchesRequest {
         ustring path;
         int type;