Browse Source

[ 1937078 ] Passing a watch object to read requests

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@670967 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 17 năm trước cách đây
mục cha
commit
23e39b5a87

+ 29 - 11
zookeeper/java/src/com/yahoo/zookeeper/ClientCnxn.java

@@ -46,6 +46,7 @@ import com.yahoo.zookeeper.AsyncCallback.VoidCallback;
 import com.yahoo.zookeeper.Watcher.Event;
 import com.yahoo.zookeeper.ZooDefs.OpCode;
 import com.yahoo.zookeeper.ZooKeeper.States;
+import com.yahoo.zookeeper.ZooKeeper.WatchRegistration;
 import com.yahoo.zookeeper.proto.AuthPacket;
 import com.yahoo.zookeeper.proto.ConnectRequest;
 import com.yahoo.zookeeper.proto.ConnectResponse;
@@ -160,8 +161,11 @@ class ClientCnxn {
 
         Object ctx;
 
+        WatchRegistration watchRegistration;
+
         Packet(RequestHeader header, ReplyHeader replyHeader, Record record,
-                Record response, ByteBuffer bb) {
+                Record response, ByteBuffer bb,
+                WatchRegistration watchRegistration) {
             this.header = header;
             this.replyHeader = replyHeader;
             this.request = record;
@@ -186,6 +190,7 @@ class ClientCnxn {
                     LOG.warn("Unexpected exception",e);
                 }
             }
+            this.watchRegistration = watchRegistration;
         }
     }
 
@@ -258,7 +263,7 @@ class ClientCnxn {
                         break;
                     }
                     if (event instanceof WatcherEvent) {
-                        zooKeeper.watcher.process((WatcherEvent) event);
+                        zooKeeper.processWatchEvent((WatcherEvent) event);
                     } else {
                         Packet p = (Packet) event;
                         int rc = 0;
@@ -339,6 +344,10 @@ class ClientCnxn {
 
     @SuppressWarnings("unchecked")
     private void finishPacket(Packet p) {
+        if (p.watchRegistration != null) {
+            p.watchRegistration.register(p.replyHeader.getErr());
+        }
+        
         p.finished = true;
         if (p.cb == null) {
             synchronized (p) {
@@ -578,10 +587,10 @@ class ClientCnxn {
                 for (AuthData id : authInfo) {
                     outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                             OpCode.auth), null, new AuthPacket(0, id.scheme,
-                            id.data), null, null));
+                            id.data), null, null, null));
                 }
-                outgoingQueue
-                        .addFirst((new Packet(null, null, null, null, bb)));
+                outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
+                        null)));
             }
             synchronized (this) {
                 k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
@@ -590,7 +599,7 @@ class ClientCnxn {
 
         private void sendPing() {
             RequestHeader h = new RequestHeader(-2, OpCode.ping);
-            queuePacket(h, null, null, null, null, null, null);
+            queuePacket(h, null, null, null, null, null, null, null);
         }
 
         int lastConnectIndex = -1;
@@ -796,9 +805,14 @@ class ClientCnxn {
     }
 
     public ReplyHeader submitRequest(RequestHeader h, Record request,
-            Record response) throws InterruptedException {
+            Record response,
+            WatchRegistration watchRegistration)
+        throws InterruptedException 
+    {
         ReplyHeader r = new ReplyHeader();
-        Packet packet = queuePacket(h, r, request, response, null, null, null);
+        Packet packet = 
+            queuePacket(h, r, request, response, null, null, null,
+                    watchRegistration);
         synchronized (packet) {
             while (!packet.finished) {
                 packet.wait();
@@ -808,13 +822,16 @@ class ClientCnxn {
     }
 
     Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
-            Record response, AsyncCallback cb, String path, Object ctx) {
+            Record response, AsyncCallback cb, String path, Object ctx,
+            WatchRegistration watchRegistration)
+    {
         Packet packet = null;
         synchronized (outgoingQueue) {
             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
                 h.setXid(getXid());
             }
-            packet = new Packet(h, r, request, response, null);
+            packet = new Packet(h, r, request, response, null,
+                    watchRegistration);
             packet.cb = cb;
             packet.ctx = ctx;
             packet.path = path;
@@ -834,7 +851,8 @@ class ClientCnxn {
         authInfo.add(new AuthData(scheme, auth));
         if (zooKeeper.state == States.CONNECTED) {
             queuePacket(new RequestHeader(-4, OpCode.auth), null,
-                    new AuthPacket(0, scheme, auth), null, null, null, null);
+                    new AuthPacket(0, scheme, auth), null, null, null, null,
+                    null);
         }
     }
 }

+ 322 - 60
zookeeper/java/src/com/yahoo/zookeeper/ZooKeeper.java

@@ -20,8 +20,12 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.log4j.Logger;
 
@@ -55,7 +59,6 @@ import com.yahoo.zookeeper.proto.SyncRequest;
 import com.yahoo.zookeeper.proto.SyncResponse;
 import com.yahoo.zookeeper.proto.WatcherEvent;
 import com.yahoo.zookeeper.server.DataTree;
-import com.yahoo.zookeeper.server.ZooKeeperServer;
 
 /**
  * This is the main class of ZooKeeper client library. To use a ZooKeeper
@@ -108,7 +111,145 @@ import com.yahoo.zookeeper.server.ZooKeeperServer;
 public class ZooKeeper {
     private static final Logger LOG = Logger.getLogger(ZooKeeper.class);
 
-    volatile Watcher watcher;
+    private volatile Watcher defaultWatcher;
+
+    private final Map<String, Set<Watcher>> dataWatches =
+        new HashMap<String, Set<Watcher>>();
+    private final Map<String, Set<Watcher>> childWatches =
+        new HashMap<String, Set<Watcher>>();
+
+    /**
+     * Process a WatchEvent.
+     *
+     * Looks up the watch in the set of watches, processes the event
+     * if found, otw uses the default watcher (registered during instance
+     * creation) to process the watch.
+     *
+     * @param event the event to process.
+     */
+    public void processWatchEvent(WatcherEvent event) {
+        // clear the watches if we are not connected
+        if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
+            synchronized (dataWatches) {
+                for (Set<Watcher> watchers : dataWatches.values()) {
+                    for (Watcher watcher : watchers) {
+                        watcher.process(event);
+                    }
+                }
+                dataWatches.clear();
+            }
+            synchronized (childWatches) {
+                for (Set<Watcher> watchers : childWatches.values()) {
+                    for (Watcher watcher : watchers) {
+                        watcher.process(event);
+                    }
+                }
+                childWatches.clear();
+            }
+        }
+
+        Set<Watcher> watchers = null;
+
+        switch (event.getType()) {
+        case Watcher.Event.EventNone:
+            defaultWatcher.process(event);
+            return;
+        case Watcher.Event.EventNodeDataChanged:
+        case Watcher.Event.EventNodeCreated:
+            synchronized (dataWatches) {
+                watchers = dataWatches.remove(event.getPath());
+            }
+            break;
+        case Watcher.Event.EventNodeChildrenChanged:
+            synchronized (childWatches) {
+                watchers = childWatches.remove(event.getPath());
+            }
+            break;
+        case Watcher.Event.EventNodeDeleted:
+            synchronized (dataWatches) {
+                watchers = dataWatches.remove(event.getPath());
+            }
+            Set<Watcher> cwatches;
+            synchronized (childWatches) {
+                cwatches = childWatches.remove(event.getPath());
+            }
+            if (cwatches != null) {
+                if (watchers == null) {
+                    watchers = cwatches;
+                } else {
+                    watchers.addAll(cwatches);
+                }
+            }
+            break;
+        default:
+            String msg = "Unhandled watch event type " + event.getType();
+            LOG.error(msg);
+            throw new RuntimeException(msg);
+        }
+
+        if (watchers != null) {
+            for (Watcher watcher : watchers) {
+                watcher.process(event);
+            }
+        }
+    }
+    
+    /**
+     * Register a watcher for a particular path.
+     */
+    class WatchRegistration {
+        private Map<String, Set<Watcher>> watches;
+        private Watcher watcher;
+        private String path;
+        public WatchRegistration(Map<String, Set<Watcher>> watches,
+                Watcher watcher, String path)
+        {
+            this.watches = watches;
+            this.watcher = watcher;  
+            this.path = path;
+        }
+        
+        /**
+         * Register the watcher with the set of watches on path.
+         * @param rc the result code of the operation that attempted to
+         * add the watch on the path.
+         */
+        public void register(int rc) {
+            if (shouldAddWatch(rc)) {
+                synchronized(watches) {
+                    Set<Watcher> watchers = watches.get(path);
+                    if (watchers == null) {
+                        watchers = new HashSet<Watcher>();
+                        watches.put(path, watchers);
+                    }
+                    watchers.add(watcher);
+                }
+            }
+        }
+        /**
+         * Determine whether the watch should be added based on return code.
+         * @param rc the result code of the operation that attempted to add the
+         * watch on the node
+         * @return true if the watch should be added, otw false
+         */
+        protected boolean shouldAddWatch(int rc) {
+            return rc == 0;
+        }
+    }
+
+    /** Handle the special case of exists watches - they add a watcher
+     * even in the case where NONODE result code is returned.
+     */
+    class ExistsWatchRegistration extends WatchRegistration {
+        public ExistsWatchRegistration(Map<String, Set<Watcher>> watches,
+                Watcher watcher, String path)
+        {
+            super(watches, watcher, path);
+        }
+        protected boolean shouldAddWatch(int rc) {
+            return rc == 0 || rc == KeeperException.Code.NoNode;
+        }
+    }
 
     public enum States {
         CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED;
@@ -124,13 +265,13 @@ public class ZooKeeper {
 
     public ZooKeeper(String host, int sessionTimeout, Watcher watcher)
             throws IOException {
-        this.watcher = watcher;
+        this.defaultWatcher = watcher;
         cnxn = new ClientCnxn(host, sessionTimeout, this);
     }
 
     public ZooKeeper(String host, int sessionTimeout, Watcher watcher,
             long sessionId, byte[] sessionPasswd) throws IOException {
-        this.watcher = watcher;
+        this.defaultWatcher = watcher;
         cnxn = new ClientCnxn(host, sessionTimeout, this, sessionId,
                 sessionPasswd);
     }
@@ -152,7 +293,7 @@ public class ZooKeeper {
     }
 
     public synchronized void register(Watcher watcher) {
-        this.watcher = watcher;
+        this.defaultWatcher = watcher;
     }
 
     /**
@@ -169,7 +310,7 @@ public class ZooKeeper {
     public synchronized void close() throws InterruptedException {
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.closeSession);
-        cnxn.submitRequest(h, null, null);
+        cnxn.submitRequest(h, null, null, null);
         try {
             cnxn.close();
         } catch (IOException e) {
@@ -240,7 +381,7 @@ public class ZooKeeper {
             throw new KeeperException.InvalidACLException();
         }
         request.setAcl(acl);
-        ReplyHeader r = cnxn.submitRequest(h, request, response);
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
         if (r.getErr() != 0) {
             throw KeeperException.create(r.getErr(), path);
         }
@@ -265,7 +406,7 @@ public class ZooKeeper {
         request.setFlags(flags);
         request.setPath(path);
         request.setAcl(acl);
-        cnxn.queuePacket(h, r, request, response, cb, path, ctx);
+        cnxn.queuePacket(h, r, request, response, cb, path, ctx, null);
     }
 
     /**
@@ -300,7 +441,7 @@ public class ZooKeeper {
         DeleteRequest request = new DeleteRequest();
         request.setPath(path);
         request.setVersion(version);
-        ReplyHeader r = cnxn.submitRequest(h, request, null);
+        ReplyHeader r = cnxn.submitRequest(h, request, null, null);
         if (r.getErr() != 0) {
             throw KeeperException.create(r.getErr());
         }
@@ -318,93 +459,137 @@ public class ZooKeeper {
         DeleteRequest request = new DeleteRequest();
         request.setPath(path);
         request.setVersion(version);
-        cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, path, ctx);
+        cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, path, ctx, null);
     }
 
     /**
      * Return the stat of the node of the given path. Return null if no such a
      * node exists.
      * <p>
-     * If the watch is true and the call is successful (no exception is thrown),
+     * If the watch is non-null and the call is successful (no exception is thrown),
      * a watch will be left on the node with the given path. The watch will be
      * triggered by a successful operation that creates/delete the node or sets
      * the data on the node.
      *
-     * @param path
-     *                the node path
-     * @param watch
-     *                whether need to watch this node
+     * @param path the node path
+     * @param watcher explicit watcher
      * @return the stat of the node of the given path; return null if no such a
      *         node exists.
      * @throws KeeperException If the server signals an error
      * @throws InterruptedException If the server transaction is interrupted.
      */
-    public Stat exists(String path, boolean watch) throws KeeperException,
-            InterruptedException {
+    public Stat exists(String path, Watcher watcher) throws KeeperException,
+        InterruptedException
+    {
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.exists);
         ExistsRequest request = new ExistsRequest();
         request.setPath(path);
-        request.setWatch(watch);
+        request.setWatch(watcher != null);
         SetDataResponse response = new SetDataResponse();
-        ReplyHeader r = cnxn.submitRequest(h, request, response);
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new ExistsWatchRegistration(dataWatches, watcher, path);
+        }
+        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
             if (r.getErr() == KeeperException.Code.NoNode) {
                 return null;
             }
             throw KeeperException.create(r.getErr());
         }
+
         return response.getStat().getCzxid() == -1 ? null : response.getStat();
     }
 
+    /**
+     * Return the stat of the node of the given path. Return null if no such a
+     * node exists.
+     * <p>
+     * If the watch is true and the call is successful (no exception is thrown),
+     * a watch will be left on the node with the given path. The watch will be
+     * triggered by a successful operation that creates/delete the node or sets
+     * the data on the node.
+     *
+     * @param path
+     *                the node path
+     * @param watch
+     *                whether need to watch this node
+     * @return the stat of the node of the given path; return null if no such a
+     *         node exists.
+     * @throws KeeperException If the server signals an error
+     * @throws InterruptedException If the server transaction is interrupted.
+     */
+    public Stat exists(String path, boolean watch) throws KeeperException,
+        InterruptedException
+    {
+        return exists(path, watch ? defaultWatcher : null);
+    }
+
     /**
      * The Asynchronous version of exists. The request doesn't actually until
      * the asynchronous callback is called.
      *
      * @see #exists(String, boolean)
      */
-    public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
+    public void exists(String path, Watcher watcher, StatCallback cb,
+            Object ctx)
+    {
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.exists);
         ExistsRequest request = new ExistsRequest();
         request.setPath(path);
-        request.setWatch(watch);
+        request.setWatch(watcher != null);
         SetDataResponse response = new SetDataResponse();
-        cnxn
-                .queuePacket(h, new ReplyHeader(), request, response, cb, path,
-                        ctx);
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new ExistsWatchRegistration(dataWatches, watcher, path);
+        }
+        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
+                        ctx, wcb);
+    }
+
+    /**
+     * The Asynchronous version of exists. The request doesn't actually until
+     * the asynchronous callback is called.
+     *
+     * @see #exists(String, boolean)
+     */
+    public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
+        exists(path, watch ? defaultWatcher : null, cb, ctx);
     }
 
     /**
      * Return the data and the stat of the node of the given path.
      * <p>
-     * If the watch is true and the call is successfull (no exception is
+     * If the watch is non-null and the call is successful (no exception is
      * thrown), a watch will be left on the node with the given path. The watch
-     * will be triggered by a sucessful operation that sets data on the node, or
+     * will be triggered by a successful operation that sets data on the node, or
      * deletes the node.
      * <p>
      * A KeeperException with error code KeeperException.NoNode will be thrown
      * if no node with the given path exists.
      *
-     * @param path
-     *                the given path
-     * @param watch
-     *                whether need to watch this node
-     * @param stat
-     *                teh stat of the node
+     * @param path the given path
+     * @param watcher explicit watcher
+     * @param stat the stat of the node
      * @return the data of the node
      * @throws KeeperException If the server signals an error with a non-zero error code
      * @throws InterruptedException If the server transaction is interrupted.
      */
-    public byte[] getData(String path, boolean watch, Stat stat)
+    public byte[] getData(String path, Watcher watcher, Stat stat)
             throws KeeperException, InterruptedException {
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.getData);
         GetDataRequest request = new GetDataRequest();
         request.setPath(path);
-        request.setWatch(watch);
+        request.setWatch(watcher != null);
         GetDataResponse response = new GetDataResponse();
-        ReplyHeader r = cnxn.submitRequest(h, request, response);
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new WatchRegistration(dataWatches, watcher, path);
+        }
+        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
             throw KeeperException.create(r.getErr());
         }
@@ -414,23 +599,58 @@ public class ZooKeeper {
         return response.getData();
     }
 
+    /**
+     * Return the data and the stat of the node of the given path.
+     * <p>
+     * If the watch is true and the call is successful (no exception is
+     * thrown), a watch will be left on the node with the given path. The watch
+     * will be triggered by a successful operation that sets data on the node, or
+     * deletes the node.
+     * <p>
+     * A KeeperException with error code KeeperException.NoNode will be thrown
+     * if no node with the given path exists.
+     *
+     * @param path the given path
+     * @param watch whether need to watch this node
+     * @param stat the stat of the node
+     * @return the data of the node
+     * @throws KeeperException If the server signals an error with a non-zero error code
+     * @throws InterruptedException If the server transaction is interrupted.
+     */
+    public byte[] getData(String path, boolean watch, Stat stat)
+            throws KeeperException, InterruptedException {
+        return getData(path, watch ? defaultWatcher : null, stat);
+    }
+
     /**
      * The Asynchronous version of getData. The request doesn't actually until
      * the asynchronous callback is called.
      *
-     * @see #getData(String, boolean, Stat)
+     * @see #getData(String, Watcher, Stat)
      */
-
-    public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
+    public void getData(String path, Watcher watcher, DataCallback cb, Object ctx) {
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.getData);
         GetDataRequest request = new GetDataRequest();
         request.setPath(path);
-        request.setWatch(watch);
+        request.setWatch(watcher != null);
         GetDataResponse response = new GetDataResponse();
-        cnxn
-                .queuePacket(h, new ReplyHeader(), request, response, cb, path,
-                        ctx);
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new WatchRegistration(dataWatches, watcher, path);
+        }
+        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
+                        ctx, wcb);
+    }
+
+    /**
+     * The Asynchronous version of getData. The request doesn't actually until
+     * the asynchronous callback is called.
+     *
+     * @see #getData(String, boolean, Stat)
+     */
+    public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
+        getData(path, watch ? defaultWatcher : null, cb, ctx);
     }
 
     /**
@@ -466,7 +686,7 @@ public class ZooKeeper {
         request.setData(data);
         request.setVersion(version);
         SetDataResponse response = new SetDataResponse();
-        ReplyHeader r = cnxn.submitRequest(h, request, response);
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
         if (r.getErr() != 0) {
             throw KeeperException.create(r.getErr());
         }
@@ -490,7 +710,7 @@ public class ZooKeeper {
         SetDataResponse response = new SetDataResponse();
         cnxn
                 .queuePacket(h, new ReplyHeader(), request, response, cb, path,
-                        ctx);
+                        ctx, null);
     }
 
     /**
@@ -515,7 +735,7 @@ public class ZooKeeper {
         GetACLRequest request = new GetACLRequest();
         request.setPath(path);
         GetACLResponse response = new GetACLResponse();
-        ReplyHeader r = cnxn.submitRequest(h, request, response);
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
         if (r.getErr() != 0) {
             throw KeeperException.create(r.getErr());
         }
@@ -537,7 +757,7 @@ public class ZooKeeper {
         GetACLResponse response = new GetACLResponse();
         cnxn
                 .queuePacket(h, new ReplyHeader(), request, response, cb, path,
-                        ctx);
+                        ctx, null);
     }
 
     /**
@@ -571,7 +791,7 @@ public class ZooKeeper {
         request.setAcl(acl);
         request.setVersion(version);
         SetACLResponse response = new SetACLResponse();
-        ReplyHeader r = cnxn.submitRequest(h, request, response);
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
         if (r.getErr() != 0) {
             throw KeeperException.create(r.getErr());
         }
@@ -595,57 +815,98 @@ public class ZooKeeper {
         SetACLResponse response = new SetACLResponse();
         cnxn
                 .queuePacket(h, new ReplyHeader(), request, response, cb, path,
-                        ctx);
+                        ctx, null);
     }
 
     /**
      * Return the list of the children of the node of the given path.
      * <p>
-     * If the watch is true and the call is successful (no exception is thrown),
+     * If the watch is non-null and the call is successful (no exception is thrown),
      * a watch will be left on the node with the given path. The watch willbe
-     * triggered by a sucessful operation that deletes the node of the given
+     * triggered by a successful operation that deletes the node of the given
      * path or creates/delete a child under the node.
      * <p>
      * A KeeperException with error code KeeperException.NoNode will be thrown
      * if no node with the given path exists.
      *
      * @param path
-     * @param watch
+     * @param watcher explicit watcher
      * @return an array of children of the node with the given path
      * @throws InterruptedException If the server transaction is interrupted.
      * @throws KeeperException If the server signals an error with a non-zero error code.
      */
-    public List<String> getChildren(String path, boolean watch)
+    public List<String> getChildren(String path, Watcher watcher)
             throws KeeperException, InterruptedException {
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.getChildren);
         GetChildrenRequest request = new GetChildrenRequest();
         request.setPath(path);
-        request.setWatch(watch);
+        request.setWatch(watcher != null);
         GetChildrenResponse response = new GetChildrenResponse();
-        ReplyHeader r = cnxn.submitRequest(h, request, response);
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new WatchRegistration(childWatches, watcher, path);
+        }
+        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
         if (r.getErr() != 0) {
             throw KeeperException.create(r.getErr());
         }
         return response.getChildren();
     }
 
+    /**
+     * Return the list of the children of the node of the given path.
+     * <p>
+     * If the watch is true and the call is successful (no exception is thrown),
+     * a watch will be left on the node with the given path. The watch willbe
+     * triggered by a successful operation that deletes the node of the given
+     * path or creates/delete a child under the node.
+     * <p>
+     * A KeeperException with error code KeeperException.NoNode will be thrown
+     * if no node with the given path exists.
+     *
+     * @param path
+     * @param watch
+     * @return an array of children of the node with the given path
+     * @throws InterruptedException If the server transaction is interrupted.
+     * @throws KeeperException If the server signals an error with a non-zero error code.
+     */
+    public List<String> getChildren(String path, boolean watch)
+            throws KeeperException, InterruptedException {
+        return getChildren(path, watch ? defaultWatcher : null);
+    }
+
     /**
      * The Asynchronous version of getChildren. The request doesn't actually
      * until the asynchronous callback is called.
      *
-     * @see #getChildren(String, boolean)
+     * @see #getChildren(String, Watcher)
      */
-    public void getChildren(String path, boolean watch, ChildrenCallback cb,
+    public void getChildren(String path, Watcher watcher, ChildrenCallback cb,
             Object ctx) {
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.getChildren);
         GetChildrenRequest request = new GetChildrenRequest();
         request.setPath(path);
-        request.setWatch(watch);
+        request.setWatch(watcher != null);
         GetChildrenResponse response = new GetChildrenResponse();
+        WatchRegistration wcb = null;
+        if (watcher != null) {
+            wcb = new WatchRegistration(childWatches, watcher, path);
+        }
         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
-                        ctx);
+                        ctx, wcb);
+    }
+
+    /**
+     * The Asynchronous version of getChildren. The request doesn't actually
+     * until the asynchronous callback is called.
+     *
+     * @see #getChildren(String, boolean)
+     */
+    public void getChildren(String path, boolean watch, ChildrenCallback cb,
+            Object ctx) {
+        getChildren(path, watch ? defaultWatcher : null, cb, ctx);
     }
 
     /**
@@ -657,7 +918,8 @@ public class ZooKeeper {
         SyncRequest request = new SyncRequest();
         SyncResponse response = new SyncResponse();
         request.setPath(path);
-        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path, ctx);
+        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path, ctx,
+                null);
     }
 
     public States getState() {

+ 2 - 3
zookeeper/java/src/com/yahoo/zookeeper/server/DataTree.java

@@ -98,9 +98,6 @@ public class DataTree {
         return nodes.size();
     }
 
-    public int getWatchCount(){
-        return dataWatches.size()+childWatches.size();
-    }
     /**
      * This is a pointer to the root of the DataTree. It is the source of truth,
      * but we usually use the nodes hashmap to find nodes in the tree.
@@ -228,7 +225,9 @@ public class DataTree {
         ZooTrace.logTraceMessage(LOG,
                                  ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                  "childWatches.triggerWatch " + parentName);
+        Set<Watcher> processed =
         dataWatches.triggerWatch(path, Event.EventNodeDeleted);
+        childWatches.triggerWatch(path, Event.EventNodeDeleted, processed);
         childWatches.triggerWatch(parentName.equals("")?"/":parentName, Event.EventNodeChildrenChanged);
     }
 

+ 15 - 4
zookeeper/java/src/com/yahoo/zookeeper/server/WatchManager.java

@@ -18,6 +18,7 @@ package com.yahoo.zookeeper.server;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.log4j.Logger;
 
@@ -31,9 +32,11 @@ import com.yahoo.zookeeper.proto.WatcherEvent;
 public class WatchManager {
     private static final Logger LOG = Logger.getLogger(WatchManager.class);
 
-    HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
+    private HashMap<String, HashSet<Watcher>> watchTable = 
+        new HashMap<String, HashSet<Watcher>>();
 
-    HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
+    private HashMap<Watcher, HashSet<String>> watch2Paths = 
+        new HashMap<Watcher, HashSet<String>>();
 
     synchronized int size(){
         return watchTable.size();
@@ -71,7 +74,11 @@ public class WatchManager {
         }
     }
 
-    void triggerWatch(String path, int type) {
+    Set<Watcher> triggerWatch(String path, int type) {
+        return triggerWatch(path, type, null);
+    }
+    
+    Set<Watcher> triggerWatch(String path, int type, Set<Watcher> supress) {
         WatcherEvent e = new WatcherEvent(type,
                 Watcher.Event.KeeperStateSyncConnected, path);
         HashSet<Watcher> watchers;
@@ -81,7 +88,7 @@ public class WatchManager {
                 ZooTrace.logTraceMessage(LOG,
                         ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                         "No watchers for " + path);
-                return;
+                return null;
             }
             for (Watcher w : watchers) {
                 HashSet<String> paths = watch2Paths.get(w);
@@ -91,7 +98,11 @@ public class WatchManager {
             }
         }
         for (Watcher w : watchers) {
+            if (supress != null && supress.contains(w)) {
+                continue;
+            }
             w.process(e);
         }
+        return watchers;
     }
 }

+ 54 - 64
zookeeper/test/com/yahoo/zookeeper/test/ClientTest.java

@@ -10,9 +10,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import junit.framework.TestCase;
-
-import org.apache.log4j.Logger;
 import org.junit.Test;
 
 import com.yahoo.zookeeper.KeeperException;
@@ -22,74 +19,35 @@ import com.yahoo.zookeeper.ZooDefs.CreateFlags;
 import com.yahoo.zookeeper.ZooDefs.Ids;
 import com.yahoo.zookeeper.data.Stat;
 import com.yahoo.zookeeper.proto.WatcherEvent;
-import com.yahoo.zookeeper.server.NIOServerCnxn;
-import com.yahoo.zookeeper.server.ServerStats;
-import com.yahoo.zookeeper.server.ZooKeeperServer;
-
-public class ClientTest extends TestCase implements Watcher {
-    private static final Logger LOG = Logger.getLogger(ClientTest.class);
-
-    private static final int CONNECTION_TIMEOUT=30000;
-    protected static String hostPort = "127.0.0.1:33221";
-    LinkedBlockingQueue<WatcherEvent> events = new LinkedBlockingQueue<WatcherEvent>();
-    static File baseTest = new File(System.getProperty("build.test.dir", "build"));
-    NIOServerCnxn.Factory f = null;
-    File tmpDir = null;
-    volatile private CountDownLatch clientConnected;
-
-    protected void setUp() throws Exception {
-        LOG.error("Client test setup");
-        tmpDir = File.createTempFile("test", ".junit", baseTest);
-        tmpDir = new File(tmpDir + ".dir");
-        tmpDir.mkdirs();
-    	ServerStats.registerAsConcrete();
-        ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
-        hostPort = "127.0.0.1:33221";
-        f = new NIOServerCnxn.Factory(33221);
-        f.startup(zks);
-        Thread.sleep(5000);
-        LOG.error("Client test setup finished");
-    }
-
-    protected void tearDown() throws Exception {
-        LOG.error("Clent test shutdown");
-        if (tmpDir != null) {
-            recursiveDelete(tmpDir);
-        }
-        if (f != null) {
-            f.shutdown();
-        }
-    	ServerStats.unregister();
-        clientConnected=null;
-        LOG.error("Client test shutdown finished");
-    }
     
-    static void recursiveDelete(File d) {
-        if (d.isDirectory()) {
-            File children[] = d.listFiles();
-            for (File f : children) {
-                recursiveDelete(f);
-            }
-        }
-        d.delete();
-    }
+public class ClientTest extends ClientBase implements Watcher {
+    LinkedBlockingQueue<WatcherEvent> events = 
+        new LinkedBlockingQueue<WatcherEvent>();
+    protected volatile CountDownLatch clientConnected;
 
-    private ZooKeeper createClient() throws IOException,InterruptedException{
+    protected ZooKeeper createClient(Watcher watcher) 
+        throws IOException, InterruptedException
+    {
         clientConnected=new CountDownLatch(1);
-		ZooKeeper zk = new ZooKeeper(hostPort, 20000, this);
+        ZooKeeper zk = new ZooKeeper(hostPort, 20000, watcher);
 		if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
 			fail("Unable to connect to server");
 		}
 		return zk;
     }
     
+    protected void tearDown() throws Exception {
+        clientConnected = null;
+        super.tearDown();
+    }
+
     @Test
     public void testPing() throws Exception {
         ZooKeeper zkIdle = null;
         ZooKeeper zkWatchCreator = null;
         try {
-            zkIdle = createClient();
-            zkWatchCreator = createClient();
+            zkIdle = createClient(this);
+            zkWatchCreator = createClient(this);
             for (int i = 0; i < 30; i++) {
                 zkWatchCreator.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
             }
@@ -113,11 +71,24 @@ public class ClientTest extends TestCase implements Watcher {
     }
 
     @Test
-    public void testClient() throws IOException,
+    public void testClientwithoutWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        performClientTest(false);
+    }
+
+    @Test
+    public void testClientWithWatcherObj() throws IOException,
+            InterruptedException, KeeperException
+    {
+        performClientTest(true);
+    }
+
+    private void performClientTest(boolean withWatcherObj) throws IOException,
             InterruptedException, KeeperException {
         ZooKeeper zk = null;
         try {
-    		zk =createClient();
+    		zk =createClient(this);
             //System.out.println("Created client: " + zk.describeCNXN());
             System.out.println("Before create /benwashere");
             zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
@@ -136,7 +107,7 @@ public class ClientTest extends TestCase implements Watcher {
             zk.close();
             //System.out.println("Closed client: " + zk.describeCNXN());
             Thread.sleep(2000);
-            zk = createClient();
+            zk = createClient(this);
             //System.out.println("Created a new client: " + zk.describeCNXN());
             System.out.println("Before delete /");
             
@@ -158,7 +129,11 @@ public class ClientTest extends TestCase implements Watcher {
             assertEquals("Ben was here", value);
             // Test stat and watch of non existent node
             try {
+                if (withWatcherObj) {
+                    assertEquals(null, zk.exists("/frog", new MyWatcher()));
+                } else {
                 assertEquals(null, zk.exists("/frog", true));
+                }
                 System.out.println("Comment: asseting passed for frog setting /");
             } catch (KeeperException.NoNodeException e) {
                 // OK, expected that
@@ -182,10 +157,19 @@ public class ClientTest extends TestCase implements Watcher {
             for (int i = 0; i < 10; i++) {
                 final String name = children.get(i);
                 assertTrue(name.startsWith(i + "-"));
-                byte b[] = zk.getData("/ben/" + name, true, stat);
+                byte b[];
+                if (withWatcherObj) {
+                    b = zk.getData("/ben/" + name, new MyWatcher(), stat);
+                } else {
+                    b = zk.getData("/ben/" + name, true, stat);
+                }
                 assertEquals(Integer.toString(i), new String(b));
                 zk.setData("/ben/" + name, "new".getBytes(), stat.getVersion());
+                if (withWatcherObj) {
+                    stat = zk.exists("/ben/" + name, new MyWatcher());
+                } else {
                 stat = zk.exists("/ben/" + name, true);
+                }
                 zk.delete("/ben/" + name, stat.getVersion());
             }
             event = events.poll(10, TimeUnit.SECONDS);
@@ -273,7 +257,7 @@ public class ClientTest extends TestCase implements Watcher {
         File tmpDir = File.createTempFile("test", ".junit", baseTest);
         tmpDir = new File(tmpDir + ".dir");
         tmpDir.mkdirs();
-        ZooKeeper zk = createClient();
+        ZooKeeper zk = createClient(this);
         zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
         zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
         try {
@@ -301,7 +285,7 @@ public class ClientTest extends TestCase implements Watcher {
             long start = System.currentTimeMillis();
             for (int i = 0; i < threadCount; i++) {
                 Thread.sleep(10);
-                ZooKeeper zk = createClient();
+                ZooKeeper zk = createClient(this);
                 String prefix = "/test-" + i;
                 zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
                 prefix += "/";
@@ -313,7 +297,7 @@ public class ClientTest extends TestCase implements Watcher {
             }
             System.err.println(new Date() + " Total time "
                     + (System.currentTimeMillis() - start));
-            ZooKeeper zk = createClient();
+            ZooKeeper zk = createClient(this);
             LOG.error("******************* Connected to ZooKeeper" + new Date());
             for (int i = 0; i < threadCount; i++) {
                 System.err.println("Doing thread: " + i + " " + new Date());
@@ -331,6 +315,12 @@ public class ClientTest extends TestCase implements Watcher {
         }
     }
 
+    public class MyWatcher implements Watcher {
+        public void process(WatcherEvent event) {
+            ClientTest.this.process(event);
+        }
+    }
+
     public void process(WatcherEvent event) {
 		if (event.getState() == Event.KeeperStateSyncConnected) {
 			clientConnected.countDown();