Prechádzať zdrojové kódy

ZOOKEEPER-1177: Add the memory optimized watch manager for concentrate watches scenario

The current HashSet based WatcherManager will consume more than 40GB memory when
creating 300M watches.

This patch optimized the memory and time complexity for concentrate watches scenario, compared to WatchManager, both the memory consumption and time complexity improved a lot. I'll post more data later with micro benchmark result.

Changed made compared to WatchManager:
* Only keep path to watches map
* Use BitSet to save the memory used to store watches
* Use ConcurrentHashMap and ReadWriteLock instead of synchronized to reduce lock retention
* Lazily clean up the closed watchers

Author: Fangmin Lyu <allenlyu@fb.com>

Reviewers: Andor Molnár <andor@apache.org>, Norbert Kalmar <nkalmar@yahoo.com>, Michael Han <hanm@apache.org>

Closes #590 from lvfangmin/ZOOKEEPER-1177
Fangmin Lyu 6 rokov pred
rodič
commit
fdde8b0064
33 zmenil súbory, kde vykonal 2583 pridanie a 78 odobranie
  1. 11 1
      build.xml
  2. 3 0
      ivy.xml
  3. 22 3
      src/java/main/org/apache/zookeeper/server/DataTree.java
  4. 100 0
      src/java/main/org/apache/zookeeper/server/DumbWatcher.java
  5. 1 0
      src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
  6. 1 0
      src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
  7. 13 3
      src/java/main/org/apache/zookeeper/server/ServerCnxn.java
  8. 1 0
      src/java/main/org/apache/zookeeper/server/ZKDatabase.java
  9. 159 0
      src/java/main/org/apache/zookeeper/server/util/BitHashSet.java
  10. 136 0
      src/java/main/org/apache/zookeeper/server/util/BitMap.java
  11. 34 0
      src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java
  12. 134 0
      src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java
  13. 43 62
      src/java/main/org/apache/zookeeper/server/watch/WatchManager.java
  14. 52 0
      src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java
  15. 389 0
      src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
  16. 182 0
      src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java
  17. 61 0
      src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java
  18. 1 1
      src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java
  19. 1 1
      src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java
  20. 1 1
      src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java
  21. 35 3
      src/java/test/config/findbugsExcludeFile.xml
  22. 110 0
      src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java
  23. 71 0
      src/java/test/org/apache/zookeeper/server/util/BitMapTest.java
  24. 404 0
      src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java
  25. 127 0
      src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
  26. 61 0
      src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java
  27. 1 1
      src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java
  28. 1 1
      src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java
  29. 1 1
      src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java
  30. 30 0
      src/test/java/bench/org/apache/zookeeper/BenchMain.java
  31. 300 0
      src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java
  32. 1 0
      zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses
  33. 96 0
      zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml

+ 11 - 1
build.xml

@@ -91,6 +91,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="src.dir" value="${basedir}/src" />
     <property name="java.src.dir" value="${src.dir}/java/main" />
     <property name="jute.src.dir" value="${basedir}/zookeeper-jute/src/main/java" />
+    <property name="java.test.dir" value="${src.dir}/test/java"/>
 
     <property name="lib.dir" value="${src.dir}/java/lib" />
     <property name="lib.dir.includes" value="**/*.jar" />
@@ -121,6 +122,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
     <property name="test.src.dir" value="${src.dir}/java/test"/>
     <property name="jute.test.src.dir" value="${basedir}/zookeeper-jute/src/test/java" />
     <property name="systest.src.dir" value="${src.dir}/java/systest"/>
+    <property name="bench.src.dir" value="${java.test.dir}/bench"/>
     <property name="test.log.dir" value="${test.java.build.dir}/logs" />
     <property name="test.data.dir" value="${test.java.build.dir}/data" />
     <property name="test.data.invalid.dir" value="${test.data.dir}/invalidsnap" />
@@ -234,6 +236,8 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
 
     <property name="hamcrest.version" value="1.3"/>
 
+    <property name="jmh.version" value="1.19"/>
+
     <!-- ====================================================== -->
     <!-- Macro definitions                                      -->
     <!-- ====================================================== -->
@@ -510,6 +514,10 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
              target="${javac.target}" source="${javac.source}" debug="on" encoding="${build.encoding}">
         <classpath refid="test.java.classpath"/>
       </javac>
+      <javac srcdir="${bench.src.dir}" destdir="${test.java.classes}" includeantruntime="false"
+             target="${javac.target}" source="${javac.source}" debug="on" encoding="${build.encoding}">
+          <classpath refid="test.java.classpath"/>
+      </javac>
     </target>
 
     <target name="compile-native" depends="compile_jute" description="Make C binding">
@@ -1967,7 +1975,9 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
                    output="${build.dir.eclipse-test-classes}" />
            <source path="${systest.src.dir}"
                    output="${build.dir.eclipse-test-classes}" />
-
+           <source path="${bench.src.dir}"
+                   output="${build.dir.eclipse-test-classes}" />
+                   
            <output path="${build.dir.eclipse-main-classes}" />
            <library pathref="default.path.id" exported="true" />
            <library pathref="junit.path.id" exported="false" />

+ 3 - 0
ivy.xml

@@ -142,6 +142,9 @@
     <dependency org="org.hamcrest" name="hamcrest-all" rev="${hamcrest.version}"
                   conf="test->default" />
 
+    <dependency org="org.openjdk.jmh" name="jmh-core" rev="${jmh.version}" conf="test->default"/>
+    <dependency org="org.openjdk.jmh" name="jmh-generator-annprocess" rev="${jmh.version}" conf="test->default"/>
+
     <conflict manager="strict"/>
 
   </dependencies>

+ 22 - 3
src/java/main/org/apache/zookeeper/server/DataTree.java

@@ -39,6 +39,12 @@ import org.apache.zookeeper.common.PathTrie;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.watch.WatchManagerFactory;
+import org.apache.zookeeper.server.watch.WatcherOrBitSet;
+import org.apache.zookeeper.server.watch.WatchesPathReport;
+import org.apache.zookeeper.server.watch.WatchesReport;
+import org.apache.zookeeper.server.watch.WatchesSummary;
 import org.apache.zookeeper.txn.CheckVersionTxn;
 import org.apache.zookeeper.txn.CreateContainerTxn;
 import org.apache.zookeeper.txn.CreateTTLTxn;
@@ -87,9 +93,9 @@ public class DataTree {
     private final ConcurrentHashMap<String, DataNode> nodes =
         new ConcurrentHashMap<String, DataNode>();
 
-    private final WatchManager dataWatches = new WatchManager();
+    private IWatchManager dataWatches;
 
-    private final WatchManager childWatches = new WatchManager();
+    private IWatchManager childWatches;
 
     /** cached total size of paths and data for all DataNodes */
     private final AtomicLong nodeDataSize = new AtomicLong(0);
@@ -253,6 +259,14 @@ public class DataTree {
         addConfigNode();
 
         nodeDataSize.set(approximateDataSize());
+        try {
+            dataWatches = WatchManagerFactory.createWatchManager();
+            childWatches = WatchManagerFactory.createWatchManager();
+        } catch (Exception e) {
+            LOG.error("Unexpected exception when creating WatchManager, " +
+                    "exiting abnormally", e);
+            System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
     }
 
     /**
@@ -611,7 +625,7 @@ public class DataTree {
             ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                     "childWatches.triggerWatch " + parentName);
         }
-        Set<Watcher> processed = dataWatches.triggerWatch(path,
+        WatcherOrBitSet processed = dataWatches.triggerWatch(path,
                 EventType.NodeDeleted);
         childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
         childWatches.triggerWatch("".equals(parentName) ? "/" : parentName,
@@ -1361,6 +1375,11 @@ public class DataTree {
         }
     }
 
+    public void shutdownWatcher() {
+        dataWatches.shutdown();
+        childWatches.shutdown();
+    }
+
     /**
      * Returns a mapping of session ID to ephemeral znodes.
      *

+ 100 - 0
src/java/main/org/apache/zookeeper/server/DumbWatcher.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.cert.Certificate;
+
+import org.apache.jute.Record;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+
+/**
+ * A empty watcher implementation used in bench and unit test.
+ */
+public class DumbWatcher extends ServerCnxn {
+
+    private long sessionId;
+
+    public DumbWatcher() {
+        this(0);
+    }
+
+    public DumbWatcher(long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    @Override
+    void setSessionTimeout(int sessionTimeout) { }
+
+    @Override
+    public void process(WatchedEvent event) { }
+
+    @Override
+    int getSessionTimeout() { return 0; }
+
+    @Override
+    void close() { }
+
+    @Override
+    public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { }
+
+    @Override
+    public void sendCloseSession() { }
+
+    @Override
+    public long getSessionId() { return sessionId; }
+
+    @Override
+    void setSessionId(long sessionId) { }
+
+    @Override
+    void sendBuffer(ByteBuffer closeConn) { }
+
+    @Override
+    void enableRecv() { }
+
+    @Override
+    void disableRecv() { }
+
+    @Override
+    protected ServerStats serverStats() { return null; }
+
+    @Override
+    public long getOutstandingRequests() { return 0; }
+
+    @Override
+    public InetSocketAddress getRemoteSocketAddress() { return null; }
+
+    @Override
+    public int getInterestOps() { return 0; }
+
+    @Override
+    public boolean isSecure() { return false; }
+
+    @Override
+    public Certificate[] getClientCertificateChain() { return null; }
+
+    @Override
+    public void setClientCertificateChain(Certificate[] chain) { }
+}

+ 1 - 0
src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -594,6 +594,7 @@ public class NIOServerCnxn extends ServerCnxn {
      */
     @Override
     public void close() {
+        setStale();
         if (!factory.removeCnxn(this)) {
             return;
         }

+ 1 - 0
src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -92,6 +92,7 @@ public class NettyServerCnxn extends ServerCnxn {
             LOG.debug("close called for sessionid:0x"
                     + Long.toHexString(sessionId));
         }
+        setStale();
 
         // ZOOKEEPER-2743:
         // Always unregister connection upon close to prevent

+ 13 - 3
src/java/main/org/apache/zookeeper/server/ServerCnxn.java

@@ -54,7 +54,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     // (aka owned by) this class
     final public static Object me = new Object();
     private static final Logger LOG = LoggerFactory.getLogger(ServerCnxn.class);
-    
+
     private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());
 
     private static final byte[] fourBytes = new byte[4];
@@ -66,6 +66,8 @@ public abstract class ServerCnxn implements Stats, Watcher {
      */
     boolean isOldClient = true;
 
+    private volatile boolean stale = false;
+
     abstract int getSessionTimeout();
 
     abstract void close();
@@ -143,6 +145,14 @@ public abstract class ServerCnxn implements Stats, Watcher {
         }
     }
 
+    public boolean isStale() {
+        return stale;
+    }
+
+    public void setStale() {
+        stale = true;
+    }
+
     protected void packetReceived(long bytes) {
         incrPacketsReceived();
         ServerStats serverStats = serverStats();
@@ -196,7 +206,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     protected long incrPacketsReceived() {
         return packetsReceived.incrementAndGet();
     }
-    
+
     protected void incrOutstandingRequests(RequestHeader h) {
     }
 
@@ -293,7 +303,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     public abstract boolean isSecure();
     public abstract Certificate[] getClientCertificateChain();
     public abstract void setClientCertificateChain(Certificate[] chain);
-    
+
     /**
      * Print information about the connection.
      * @param brief iff true prints brief details, otw full detail

+ 1 - 0
src/java/main/org/apache/zookeeper/server/ZKDatabase.java

@@ -138,6 +138,7 @@ public class ZKDatabase {
         /* to be safe we just create a new
          * datatree.
          */
+        dataTree.shutdownWatcher();
         dataTree = createDataTree();
         sessionsWithTimeouts.clear();
         WriteLock lock = logLock.writeLock();

+ 159 - 0
src/java/main/org/apache/zookeeper/server/util/BitHashSet.java

@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.BitSet;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.lang.Iterable;
+
+/**
+ * Using BitSet to store all the elements, and use HashSet to cache limited
+ * number of elements to find a balance between memory and time complexity.
+ *
+ * Without HashSet, we need to use O(N) time to get the elements, N is
+ * the bit numbers in elementBits. But we need to keep the size small to make
+ * sure it doesn't cost too much in memory, there is a trade off between
+ * memory and time complexity.
+ *
+ * Previously, was deciding to dynamically switch between SparseBitSet and
+ * HashSet based on the memory consumption, but it will take time to copy
+ * data over and may have some herd effect of keep copying data from one
+ * data structure to anther. The current solution can do a very good job
+ * given most of the paths have limited number of elements.
+ */
+public class BitHashSet implements Iterable<Integer> {
+
+    /**
+     * Change to SparseBitSet if we we want to optimize more, the number of
+     * elements on a single server is usually limited, so BitSet should be
+     * fine.
+     */
+    private final BitSet elementBits = new BitSet();
+
+    /**
+     * HashSet is used to optimize the iterating, if there is a single 
+     * element in this BitHashSet, but the bit is very large, without 
+     * HashSet we need to go through all the words before return that 
+     * element, which is not efficient.
+     */
+    private final Set<Integer> cache = new HashSet<Integer>();
+
+    private final int cacheSize;
+
+    // To record how many elements in this set.
+    private int elementCount = 0;
+
+    public BitHashSet() {
+        this(Integer.getInteger("zookeeper.bitHashCacheSize", 10));
+    }
+
+    public BitHashSet(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
+    public synchronized boolean add(Integer elementBit) {
+        if (elementBit == null || elementBits.get(elementBit)) {
+            return false;
+        }
+        if (cache.size() < cacheSize) {
+            cache.add(elementBit);
+        }
+        elementBits.set(elementBit);
+        elementCount++;
+        return true;
+    }
+
+    /**
+     * Remove the watches, and return the number of watches being removed.
+     */
+    public synchronized int remove(Set<Integer> bitSet, BitSet bits) {
+        cache.removeAll(bitSet);
+        elementBits.andNot(bits);
+        int elementCountBefore = elementCount;
+        elementCount = elementBits.cardinality();
+        return elementCountBefore - elementCount;
+    }
+
+    public synchronized boolean remove(Integer elementBit) {
+        if (elementBit == null || !elementBits.get(elementBit)) {
+            return false;
+        }
+
+        cache.remove(elementBit);
+        elementBits.clear(elementBit);
+        elementCount--;
+        return true;
+    }
+
+    public synchronized boolean contains(Integer elementBit) {
+        if (elementBit == null) {
+            return false;
+        }
+        return elementBits.get(elementBit);
+    }
+
+    public synchronized int size() {
+        return elementCount;
+    }
+
+    /**
+     * This function is not thread-safe, need to synchronized when
+     * iterate through this set.
+     */
+    @Override
+    public Iterator<Integer> iterator() {
+        if (cache.size() == elementCount) {
+            return cache.iterator();
+        }
+
+        return new Iterator<Integer>() {
+            int returnedCount = 0;
+            int bitIndex = 0;
+
+            @Override
+            public boolean hasNext() {
+                return returnedCount < elementCount;
+            }
+
+            @Override
+            public Integer next() {
+                int bit = elementBits.nextSetBit(bitIndex);
+                bitIndex = bit + 1;
+                returnedCount++;
+                return bit;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    // visible for test
+    public synchronized int cachedSize() {
+        return cache.size();
+    }
+
+    public synchronized boolean isEmpty() {
+        return elementCount == 0;
+    }
+}

+ 136 - 0
src/java/main/org/apache/zookeeper/server/util/BitMap.java

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is a helper class to maintain the bit to specific value and the
+ * reversed value to bit mapping.
+ */
+public class BitMap<T> {
+
+    private final Map<T, Integer> value2Bit = new HashMap<T, Integer>();
+    private final Map<Integer, T> bit2Value = new HashMap<Integer, T>();
+
+    private final BitSet freedBitSet = new BitSet();
+    private Integer nextBit = Integer.valueOf(0);
+
+    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    public Integer add(T value) {
+        /*
+         * Optimized for code which will add the same value again and again,
+         * more specifically this is used to add new bit for watcher, and
+         * the same watcher may watching thousands or even millions of nodes,
+         * which will call add the same value of this function, check exist
+         * using read lock will optimize the performance here.
+         */
+        Integer bit = getBit(value);
+        if (bit != null) {
+            return bit;
+        }
+
+        rwLock.writeLock().lock();
+        try {
+            bit = value2Bit.get(value);
+            if (bit != null) {
+                return bit;
+            }
+            bit = freedBitSet.nextSetBit(0);
+            if (bit > -1) {
+                freedBitSet.clear(bit);
+            } else {
+                bit = nextBit++;
+            }
+
+            value2Bit.put(value, bit);
+            bit2Value.put(bit, value);
+            return bit;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public T get(int bit) {
+        rwLock.readLock().lock();
+        try {
+            return bit2Value.get(bit);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public Integer getBit(T value) {
+        rwLock.readLock().lock();
+        try {
+            return value2Bit.get(value);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public int remove(T value) {
+        /*
+         * remove only called once when the session is closed, so use write 
+         * lock directly without checking read lock.
+         */
+        rwLock.writeLock().lock();
+        try {
+            Integer bit = value2Bit.get(value);
+            if (bit == null) {
+                return -1;
+            }
+            value2Bit.remove(value);
+            bit2Value.remove(bit);
+            freedBitSet.set(bit);
+            return bit;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public T remove(int bit) {
+        rwLock.writeLock().lock();
+        try {
+            T value = bit2Value.get(bit);
+            if (value == null) {
+                return null;
+            }
+            value2Bit.remove(value);
+            bit2Value.remove(bit);
+            freedBitSet.set(bit);
+            return value;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public int size() {
+        rwLock.readLock().lock();
+        try {
+            return value2Bit.size();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+}

+ 34 - 0
src/java/main/org/apache/zookeeper/server/watch/IDeadWatcherListener.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+
+/**
+ * Interface used to process the dead watchers related to closed cnxns.
+ */
+public interface IDeadWatcherListener {
+
+    /**
+     * Process the given dead watchers.
+     *
+     * @param deadWatchers the watchers which have closed cnxn
+     */
+    public void processDeadWatchers(Set<Integer> deadWatchers);
+}

+ 134 - 0
src/java/main/org/apache/zookeeper/server/watch/IWatchManager.java

@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public interface IWatchManager {
+
+    /**
+     * Add watch to specific path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher added is not already present
+     */
+    public boolean addWatch(String path, Watcher watcher);
+
+    /**
+     * Checks the specified watcher exists for the given path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher exists, false otherwise
+     */
+    public boolean containsWatcher(String path, Watcher watcher);
+
+    /**
+     * Removes the specified watcher for the given path.
+     *
+     * @param path znode path
+     * @param watcher watcher object reference
+     *
+     * @return true if the watcher successfully removed, false otherwise
+     */
+    public boolean removeWatcher(String path, Watcher watcher);
+
+    /**
+     * The entry to remove the watcher when the cnxn is closed.
+     *
+     * @param watcher watcher object reference
+     */
+    public void removeWatcher(Watcher watcher);
+
+    /**
+     * Distribute the watch event for the given path.
+     *
+     * @param path znode path
+     * @param type the watch event type
+     *
+     * @return the watchers have been notified
+     */
+    public WatcherOrBitSet triggerWatch(String path, EventType type);
+
+    /**
+     * Distribute the watch event for the given path, but ignore those
+     * suppressed ones.
+     *
+     * @param path znode path
+     * @param type the watch event type
+     * @param suppress the suppressed watcher set
+     *
+     * @return the watchers have been notified
+     */
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet suppress);
+
+    /**
+     * Get the size of watchers.
+     *
+     * @return the watchers number managed in this class.
+     */
+    public int size();
+
+    /**
+     * Clean up the watch manager.
+     */
+    public void shutdown();
+
+    /**
+     * Returns a watch summary.
+     *
+     * @return watch summary
+     * @see WatchesSummary
+     */
+    public WatchesSummary getWatchesSummary();
+
+    /**
+     * Returns a watch report.
+     *
+     * @return watch report
+     * @see WatchesReport
+     */
+    public WatchesReport getWatches();
+
+    /**
+     * Returns a watch report by path.
+     *
+     * @return watch report
+     * @see WatchesPathReport
+     */
+    public WatchesPathReport getWatchesByPath();
+
+    /**
+     * String representation of watches. Warning, may be large!
+     *
+     * @param pwriter the writer to dump the watches
+     * @param byPath iff true output watches by paths, otw output
+     * watches by connection
+     *
+     * @return string representation of watches
+     */
+    public void dumpWatches(PrintWriter pwriter, boolean byPath);
+}

+ 43 - 62
src/java/main/org/apache/zookeeper/server/WatchManager.java → src/java/main/org/apache/zookeeper/server/watch/WatchManager.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.server;
+package org.apache.zookeeper.server.watch;
 
 import java.io.PrintWriter;
 import java.util.HashMap;
@@ -30,6 +30,8 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ZooTrace;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory;
  * This class manages watches. It allows watches to be associated with a string
  * and removes watchers and their watches in addition to managing triggers.
  */
-class WatchManager {
+public class WatchManager implements IWatchManager {
     private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
 
     private final Map<String, Set<Watcher>> watchTable =
@@ -46,7 +48,8 @@ class WatchManager {
     private final Map<Watcher, Set<String>> watch2Paths =
         new HashMap<Watcher, Set<String>>();
 
-    synchronized int size(){
+    @Override
+    public synchronized int size(){
         int result = 0;
         for(Set<Watcher> watches : watchTable.values()) {
             result += watches.size();
@@ -54,7 +57,17 @@ class WatchManager {
         return result;
     }
 
-    synchronized void addWatch(String path, Watcher watcher) {
+    boolean isDeadWatcher(Watcher watcher) {
+        return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
+    }
+
+    @Override
+    public synchronized boolean addWatch(String path, Watcher watcher) {
+        if (isDeadWatcher(watcher)) {
+            LOG.debug("Ignoring addWatch with closed cnxn");
+            return false;
+        }
+
         Set<Watcher> list = watchTable.get(path);
         if (list == null) {
             // don't waste memory if there are few watches on a node
@@ -71,10 +84,11 @@ class WatchManager {
             paths = new HashSet<String>();
             watch2Paths.put(watcher, paths);
         }
-        paths.add(path);
+        return paths.add(path);
     }
 
-    synchronized void removeWatcher(Watcher watcher) {
+    @Override
+    public synchronized void removeWatcher(Watcher watcher) {
         Set<String> paths = watch2Paths.remove(watcher);
         if (paths == null) {
             return;
@@ -83,18 +97,21 @@ class WatchManager {
             Set<Watcher> list = watchTable.get(p);
             if (list != null) {
                 list.remove(watcher);
-                if (list.size() == 0) {
+                if (list.isEmpty()) {
                     watchTable.remove(p);
                 }
             }
         }
     }
 
-    Set<Watcher> triggerWatch(String path, EventType type) {
+    @Override
+    public WatcherOrBitSet triggerWatch(String path, EventType type) {
         return triggerWatch(path, type, null);
     }
 
-    Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
+    @Override
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet supress) {
         WatchedEvent e = new WatchedEvent(type,
                 KeeperState.SyncConnected, path);
         Set<Watcher> watchers;
@@ -121,12 +138,9 @@ class WatchManager {
             }
             w.process(e);
         }
-        return watchers;
+        return new WatcherOrBitSet(watchers);
     }
 
-    /**
-     * Brief description of this object.
-     */
     @Override
     public synchronized String toString() {
         StringBuilder sb = new StringBuilder();
@@ -143,13 +157,8 @@ class WatchManager {
         return sb.toString();
     }
 
-    /**
-     * String representation of watches. Warning, may be large!
-     * @param byPath iff true output watches by paths, otw output
-     * watches by connection
-     * @return string representation of watches
-     */
-    synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
+    @Override
+    public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
         if (byPath) {
             for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
                 pwriter.println(e.getKey());
@@ -171,16 +180,8 @@ class WatchManager {
         }
     }
 
-    /**
-     * Checks the specified watcher exists for the given path
-     *
-     * @param path
-     *            znode path
-     * @param watcher
-     *            watcher object reference
-     * @return true if the watcher exists, false otherwise
-     */
-    synchronized boolean containsWatcher(String path, Watcher watcher) {
+    @Override
+    public synchronized boolean containsWatcher(String path, Watcher watcher) {
         Set<String> paths = watch2Paths.get(watcher);
         if (paths == null || !paths.contains(path)) {
             return false;
@@ -188,16 +189,8 @@ class WatchManager {
         return true;
     }
 
-    /**
-     * Removes the specified watcher for the given path
-     *
-     * @param path
-     *            znode path
-     * @param watcher
-     *            watcher object reference
-     * @return true if the watcher successfully removed, false otherwise
-     */
-    synchronized boolean removeWatcher(String path, Watcher watcher) {
+    @Override
+    public synchronized boolean removeWatcher(String path, Watcher watcher) {
         Set<String> paths = watch2Paths.get(watcher);
         if (paths == null || !paths.remove(path)) {
             return false;
@@ -208,20 +201,15 @@ class WatchManager {
             return false;
         }
 
-        if (list.size() == 0) {
+        if (list.isEmpty()) {
             watchTable.remove(path);
         }
 
         return true;
     }
 
-    /**
-     * Returns a watch report.
-     *
-     * @return watch report
-     * @see WatchesReport
-     */
-    synchronized WatchesReport getWatches() {
+    @Override
+    public synchronized WatchesReport getWatches() {
         Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
         for (Entry<Watcher, Set<String>> e: watch2Paths.entrySet()) {
             Long id = ((ServerCnxn) e.getKey()).getSessionId();
@@ -231,13 +219,8 @@ class WatchManager {
         return new WatchesReport(id2paths);
     }
 
-    /**
-     * Returns a watch report by path.
-     *
-     * @return watch report
-     * @see WatchesPathReport
-     */
-    synchronized WatchesPathReport getWatchesByPath() {
+    @Override
+    public synchronized WatchesPathReport getWatchesByPath() {
         Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
         for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) {
             Set<Long> ids = new HashSet<Long>(e.getValue().size());
@@ -249,13 +232,8 @@ class WatchManager {
         return new WatchesPathReport(path2ids);
     }
 
-    /**
-     * Returns a watch summary.
-     *
-     * @return watch summary
-     * @see WatchesSummary
-     */
-    synchronized WatchesSummary getWatchesSummary() {
+    @Override
+    public synchronized WatchesSummary getWatchesSummary() {
         int totalWatches = 0;
         for (Set<String> paths : watch2Paths.values()) {
             totalWatches += paths.size();
@@ -263,4 +241,7 @@ class WatchManager {
         return new WatchesSummary (watch2Paths.size(), watchTable.size(),
                                    totalWatches);
     }
+
+    @Override
+    public void shutdown() { /* do nothing */ }
 }

+ 52 - 0
src/java/main/org/apache/zookeeper/server/watch/WatchManagerFactory.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A factory used to produce the actual watch manager based on the
+ * zookeeper.watchManagerName option.
+ */
+public class WatchManagerFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(WatchManagerFactory.class);
+
+    public static final String ZOOKEEPER_WATCH_MANAGER_NAME = "zookeeper.watchManagerName";
+
+    public static IWatchManager createWatchManager() throws IOException {
+        String watchManagerName = System.getProperty(ZOOKEEPER_WATCH_MANAGER_NAME);
+        if (watchManagerName == null) {
+            watchManagerName = WatchManager.class.getName();
+        }
+        try {
+            IWatchManager watchManager =
+                    (IWatchManager) Class.forName(watchManagerName).newInstance();
+            LOG.info("Using {} as watch manager", watchManagerName);
+            return watchManager;
+        } catch (Exception e) {
+            IOException ioe = new IOException("Couldn't instantiate "
+                    + watchManagerName);
+            ioe.initCause(e);
+            throw ioe;
+        }
+    }
+}

+ 389 - 0
src/java/main/org/apache/zookeeper/server/watch/WatchManagerOptimized.java

@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.util.BitHashSet;
+import org.apache.zookeeper.server.util.BitMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Optimized in memory and time complexity, compared to WatchManager, both the
+ * memory consumption and time complexity improved a lot, but it cannot
+ * efficiently remove the watcher when the session or socket is closed, for
+ * majority use case this is not a problem.
+ *
+ * Changed made compared to WatchManager:
+ *
+ * - Use HashSet and BitSet to store the watchers to find a balance between
+ *   memory usage and time complexity
+ * - Use ReadWriteLock instead of synchronized to reduce lock retention
+ * - Lazily clean up the closed watchers
+ */
+public class WatchManagerOptimized
+        implements IWatchManager, IDeadWatcherListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatchManagerOptimized.class);
+
+    private final ConcurrentHashMap<String, BitHashSet> pathWatches =
+            new ConcurrentHashMap<String, BitHashSet>();
+
+    // watcher to bit id mapping
+    private final BitMap<Watcher> watcherBitIdMap = new BitMap<Watcher>();
+
+    // used to lazily remove the dead watchers
+    private final WatcherCleaner watcherCleaner;
+
+    private final ReentrantReadWriteLock addRemovePathRWLock = new ReentrantReadWriteLock();
+
+    public WatchManagerOptimized() {
+        watcherCleaner = new WatcherCleaner(this);
+        watcherCleaner.start();
+    }
+
+    @Override
+    public boolean addWatch(String path, Watcher watcher) {
+        boolean result = false;
+        // Need readLock to exclusively lock with removeWatcher, otherwise we 
+        // may add a dead watch whose connection was just closed. 
+        //
+        // Creating new watcher bit and adding it to the BitHashSet has it's 
+        // own lock to minimize the write lock scope
+        addRemovePathRWLock.readLock().lock();
+        try {
+            // avoid race condition of adding a on flying dead watcher
+            if (isDeadWatcher(watcher)) {
+                LOG.debug("Ignoring addWatch with closed cnxn");
+            } else {
+                Integer bit = watcherBitIdMap.add(watcher);
+                BitHashSet watchers = pathWatches.get(path);
+                if (watchers == null) {
+                    watchers = new BitHashSet();
+                    BitHashSet existingWatchers = pathWatches.putIfAbsent(path, watchers);
+                    // it's possible multiple thread might add to pathWatches 
+                    // while we're holding read lock, so we need this check 
+                    // here
+                    if (existingWatchers != null) {
+                        watchers = existingWatchers;
+                    }
+                }
+                result = watchers.add(bit);
+            }
+        } finally {
+            addRemovePathRWLock.readLock().unlock();
+        }
+        return result;
+    }
+
+    /**
+     * Used in the OpCode.checkWatches, which is a read operation, since read
+     * and write requests are exclusively processed, we don't need to hold
+     * lock here. 
+     *
+     * Different from addWatch this method doesn't mutate any state, so we don't
+     * need to hold read lock to avoid dead watcher (cnxn closed) being added 
+     * to the watcher manager. 
+     *
+     * It's possible that before we lazily clean up the dead watcher, this will 
+     * return true, but since the cnxn is closed, the response will dropped as
+     * well, so it doesn't matter.
+     */
+    @Override
+    public boolean containsWatcher(String path, Watcher watcher) {
+        BitHashSet watchers = pathWatches.get(path);
+        if (watchers == null || !watchers.contains(watcherBitIdMap.getBit(watcher))) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean removeWatcher(String path, Watcher watcher) {
+        // Hold write lock directly because removeWatcher request is more 
+        // likely to be invoked when the watcher is actually exist and 
+        // haven't fired yet, so instead of having read lock to check existence 
+        // before switching to write one, it's actually cheaper to hold write 
+        // lock directly here.
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            BitHashSet list = pathWatches.get(path);
+            if (list == null || !list.remove(watcherBitIdMap.getBit(watcher))) {
+                return false;
+            }
+            if (list.isEmpty()) {
+                pathWatches.remove(path);
+            }
+            return true;
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void removeWatcher(Watcher watcher) {
+        Integer watcherBit;
+        // Use exclusive lock with addWatcher to guarantee that we won't add
+        // watch for a cnxn which is already closed. 
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            // do nothing if the watcher is not tracked
+            watcherBit = watcherBitIdMap.getBit(watcher);
+            if (watcherBit == null) {
+                return;
+            }
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+
+        // We can guarantee that when this line is executed, the cnxn of this 
+        // watcher has already been marked as stale (this method is only called 
+        // from ServerCnxn.close after we set stale), which means no watches 
+        // will be added to the watcher manager with this watcher, so that we
+        // can safely clean up this dead watcher. 
+        //
+        // So it's not necessary to have this line in the addRemovePathRWLock. 
+        // And moving the addDeadWatcher out of the locking block to avoid
+        // holding the write lock while we're blocked on adding dead watchers 
+        // into the watcherCleaner.
+        watcherCleaner.addDeadWatcher(watcherBit);
+    }
+
+    /**
+     * Entry for WatcherCleaner to remove dead watchers
+     *
+     * @param deadWatchers the watchers need to be removed
+     */
+    @Override
+    public void processDeadWatchers(Set<Integer> deadWatchers) {
+        // All the watchers being processed here are guaranteed to be dead, 
+        // no watches will be added for those dead watchers, that's why I 
+        // don't need to have addRemovePathRWLock here.
+        BitSet bits = new BitSet();
+        for (int dw: deadWatchers) {
+            bits.set(dw);
+        }
+        // The value iterator will reflect the state when it was
+        // created, don't need to synchronize.
+        for (BitHashSet watchers: pathWatches.values()) {
+            watchers.remove(deadWatchers, bits);
+        }
+        // Better to remove the empty path from pathWatches, but it will add
+        // lot of lock contention and affect the throughput of addWatch,
+        // let's rely on the triggerWatch to delete it.
+        for (Integer wbit: deadWatchers) {
+            watcherBitIdMap.remove(wbit);
+        }
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(String path, EventType type) {
+        return triggerWatch(path, type, null);
+    }
+
+    @Override
+    public WatcherOrBitSet triggerWatch(
+            String path, EventType type, WatcherOrBitSet suppress) {
+        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
+
+        BitHashSet watchers = remove(path);
+        if (watchers == null) {
+            return null;
+        }
+
+        // Avoid race condition between dead watcher cleaner in
+        // WatcherCleaner and iterating here
+        synchronized (watchers) {
+            for (Integer wBit : watchers) {
+                if (suppress != null && suppress.contains(wBit)) {
+                    continue;
+                }
+
+                Watcher w = watcherBitIdMap.get(wBit);
+
+                // skip dead watcher
+                if (w == null || isDeadWatcher(w)) {
+                    continue;
+                }
+
+                w.process(e);
+            }
+        }
+
+        return new WatcherOrBitSet(watchers);
+    }
+
+    @Override
+    public int size() {
+        int size = 0;
+        for(BitHashSet watches : pathWatches.values()) {
+            size += watches.size();
+        }
+        return size;
+    }
+
+    @Override
+    public void shutdown() {
+        if (watcherCleaner != null) {
+            watcherCleaner.shutdown();
+        }
+    }
+
+    private BitHashSet remove(String path) {
+        addRemovePathRWLock.writeLock().lock();
+        try {
+            return pathWatches.remove(path);
+        } finally {
+            addRemovePathRWLock.writeLock().unlock();
+        }
+    }
+
+    boolean isDeadWatcher(Watcher watcher) {
+        return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
+    }
+
+    int pathSize() {
+        return pathWatches.size();
+    }
+
+    @Override
+    public WatchesSummary getWatchesSummary() {
+        return new WatchesSummary(
+                watcherBitIdMap.size(), pathSize(), size());
+    }
+
+    @Override
+    public WatchesReport getWatches() {
+        Map<Long, Set<String>> id2paths = new HashMap<Long, Set<String>>();
+        for (Entry<Watcher, Set<String>> e: getWatcher2PathesMap().entrySet()) {
+            Long id = ((ServerCnxn) e.getKey()).getSessionId();
+            Set<String> paths = new HashSet<String>(e.getValue());
+            id2paths.put(id, paths);
+        }
+        return new WatchesReport(id2paths);
+    }
+
+    /**
+     * Iterate through ConcurrentHashMap is 'safe', it will reflect the state
+     * of the map at the time iteration began, may miss update while iterating,
+     * given this is used in the commands to get a general idea of the watches
+     * state, we don't care about missing some update.
+     */
+    @Override
+    public WatchesPathReport getWatchesByPath() {
+        Map<String, Set<Long>> path2ids = new HashMap<String, Set<Long>>();
+        for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+            BitHashSet watchers = e.getValue();
+            synchronized (watchers) {
+                Set<Long> ids = new HashSet<Long>(watchers.size());
+                path2ids.put(e.getKey(), ids);
+                for (Integer wbit : watchers) {
+                    Watcher watcher = watcherBitIdMap.get(wbit);
+                    if (watcher instanceof ServerCnxn) {
+                        ids.add(((ServerCnxn) watcher).getSessionId());
+                    }
+                }
+            }
+        }
+        return new WatchesPathReport(path2ids);
+    }
+
+    /**
+     * May cause OOM if there are lots of watches, might better to forbid
+     * it in this class.
+     */
+    public Map<Watcher, Set<String>> getWatcher2PathesMap() {
+        Map<Watcher, Set<String>> watcher2paths =
+                new HashMap<Watcher, Set<String>>();
+        for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+            String path = e.getKey();
+            BitHashSet watchers = e.getValue();
+            // avoid race condition with add/remove
+            synchronized (watchers) {
+                for (Integer wbit: watchers) {
+                    Watcher w = watcherBitIdMap.get(wbit);
+                    if (w == null) {
+                        continue;
+                    }
+                    if (!watcher2paths.containsKey(w)) {
+                        watcher2paths.put(w, new HashSet<String>());
+                    }
+                    watcher2paths.get(w).add(path);
+                }
+            }
+        }
+        return watcher2paths;
+    }
+
+    @Override
+    public void dumpWatches(PrintWriter pwriter, boolean byPath) {
+        if (byPath) {
+            for (Entry<String, BitHashSet> e : pathWatches.entrySet()) {
+                pwriter.println(e.getKey());
+                BitHashSet watchers = e.getValue();
+                synchronized (watchers) {
+                    for (Integer wbit : watchers) {
+                        Watcher w = watcherBitIdMap.get(wbit);
+                        if (!(w instanceof ServerCnxn)) {
+                            continue;
+                        }
+                        pwriter.print("\t0x");
+                        pwriter.print(
+                                Long.toHexString(((ServerCnxn)w).getSessionId()));
+                        pwriter.print("\n");
+                    }
+                }
+            }
+        } else {
+            for (Entry<Watcher, Set<String>> e : getWatcher2PathesMap().entrySet()) {
+                pwriter.print("0x");
+                pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
+                for (String path : e.getValue()) {
+                    pwriter.print("\t");
+                    pwriter.println(path);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(watcherBitIdMap.size())
+            .append(" connections watching ")
+            .append(pathSize()).append(" paths\n");
+        sb.append("Total watches:").append(size());
+        return sb.toString();
+    }
+}

+ 182 - 0
src/java/main/org/apache/zookeeper/server/watch/WatcherCleaner.java

@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.RateLogger;
+import org.apache.zookeeper.server.WorkerService;
+import org.apache.zookeeper.server.WorkerService.WorkRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread used to lazily clean up the closed watcher, it will trigger the
+ * clean up when the dead watchers get certain number or some number of
+ * seconds has elapsed since last clean up.
+ *
+ * Cost of running it:
+ *
+ * - need to go through all the paths even if the watcher may only
+ *   watching a single path
+ * - block in the path BitHashSet when we try to check the dead watcher
+ *   which won't block other stuff
+ */
+public class WatcherCleaner extends Thread {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleaner.class);
+    private final RateLogger RATE_LOGGER = new RateLogger(LOG);
+
+    private volatile boolean stopped = false;
+    private final Object cleanEvent = new Object();
+    private final Random r = new Random(System.nanoTime());
+    private final WorkerService cleaners;
+
+    private final Set<Integer> deadWatchers;
+    private final IDeadWatcherListener listener;
+    private final int watcherCleanThreshold;
+    private final int watcherCleanIntervalInSeconds;
+    private final int maxInProcessingDeadWatchers;
+    private final AtomicInteger totalDeadWatchers = new AtomicInteger();
+
+    public WatcherCleaner(IDeadWatcherListener listener) {
+        this(listener,
+            Integer.getInteger("zookeeper.watcherCleanThreshold", 1000),
+            Integer.getInteger("zookeeper.watcherCleanIntervalInSeconds", 600),
+            Integer.getInteger("zookeeper.watcherCleanThreadsNum", 2),
+            Integer.getInteger("zookeeper.maxInProcessingDeadWatchers", -1));
+    }
+
+    public WatcherCleaner(IDeadWatcherListener listener,
+            int watcherCleanThreshold, int watcherCleanIntervalInSeconds,
+            int watcherCleanThreadsNum, int maxInProcessingDeadWatchers) {
+        this.listener = listener;
+        this.watcherCleanThreshold = watcherCleanThreshold;
+        this.watcherCleanIntervalInSeconds = watcherCleanIntervalInSeconds;
+        int suggestedMaxInProcessingThreshold =
+                watcherCleanThreshold * watcherCleanThreadsNum;
+        if (maxInProcessingDeadWatchers > 0 &&
+                maxInProcessingDeadWatchers < suggestedMaxInProcessingThreshold) {
+            maxInProcessingDeadWatchers = suggestedMaxInProcessingThreshold;
+            LOG.info("The maxInProcessingDeadWatchers config is smaller " +
+                    "than the suggested one, change it to use {}",
+                    maxInProcessingDeadWatchers);
+        }
+        this.maxInProcessingDeadWatchers = maxInProcessingDeadWatchers;
+        this.deadWatchers = new HashSet<Integer>();
+        this.cleaners = new WorkerService("DeadWatcherCleanner",
+                watcherCleanThreadsNum, false);
+
+        LOG.info("watcherCleanThreshold={}, watcherCleanIntervalInSeconds={}" +
+                ", watcherCleanThreadsNum={}, maxInProcessingDeadWatchers={}",
+                watcherCleanThreshold, watcherCleanIntervalInSeconds,
+                watcherCleanThreadsNum, maxInProcessingDeadWatchers);
+    }
+
+    public void addDeadWatcher(int watcherBit) {
+        // Wait if there are too many watchers waiting to be closed,
+        // this is will slow down the socket packet processing and
+        // the adding watches in the ZK pipeline.
+        while (maxInProcessingDeadWatchers > 0 && !stopped &&
+                totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
+            try {
+                RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
+                synchronized(totalDeadWatchers) {
+                    totalDeadWatchers.wait(100);
+                }
+            } catch (InterruptedException e) {
+                LOG.info("Got interrupted while waiting for dead watches " +
+                        "queue size");
+            }
+        }
+        synchronized (this) {
+            if (deadWatchers.add(watcherBit)) {
+                totalDeadWatchers.incrementAndGet();
+                if (deadWatchers.size() >= watcherCleanThreshold) {
+                    synchronized (cleanEvent) {
+                        cleanEvent.notifyAll();
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            synchronized (cleanEvent) {
+                try {
+                    // add some jitter to avoid cleaning dead watchers at the
+                    // same time in the quorum
+                    if (deadWatchers.size() < watcherCleanThreshold) {
+                        int maxWaitMs = (watcherCleanIntervalInSeconds +
+                            r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
+                        cleanEvent.wait(maxWaitMs);
+                    }
+                } catch (InterruptedException e) {
+                    LOG.info("Received InterruptedException while " +
+                            "waiting for cleanEvent");
+                    break;
+                }
+            }
+
+            if (deadWatchers.isEmpty()) {
+                continue;
+            }
+
+            synchronized (this) {
+                // Clean the dead watchers need to go through all the current 
+                // watches, which is pretty heavy and may take a second if 
+                // there are millions of watches, that's why we're doing lazily 
+                // batch clean up in a separate thread with a snapshot of the 
+                // current dead watchers.
+                final Set<Integer> snapshot = new HashSet<Integer>(deadWatchers);
+                deadWatchers.clear();
+                int total = snapshot.size();
+                LOG.info("Processing {} dead watchers", total);
+                cleaners.schedule(new WorkRequest() {
+                    @Override
+                    public void doWork() throws Exception {
+                        long startTime = Time.currentElapsedTime();
+                        listener.processDeadWatchers(snapshot);
+                        long latency = Time.currentElapsedTime() - startTime;
+                        LOG.info("Takes {} to process {} watches", latency, total);
+                        totalDeadWatchers.addAndGet(-total);
+                        synchronized(totalDeadWatchers) {
+                            totalDeadWatchers.notifyAll();
+                        }
+                    }
+                });
+            }
+        }
+        LOG.info("WatcherCleaner thread exited");
+    }
+
+    public void shutdown() {
+        stopped = true;
+        deadWatchers.clear();
+        cleaners.stop();
+    }
+
+}

+ 61 - 0
src/java/main/org/apache/zookeeper/server/watch/WatcherOrBitSet.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+public class WatcherOrBitSet {
+
+    private Set<Watcher> watchers;
+    private BitHashSet watcherBits;
+
+    public WatcherOrBitSet(final Set<Watcher> watchers) {
+        this.watchers = watchers;
+    }
+
+    public WatcherOrBitSet(final BitHashSet watcherBits) {
+        this.watcherBits = watcherBits;
+    }
+
+    public boolean contains(Watcher watcher) {
+        if (watchers == null) {
+            return false;
+        }
+        return watchers.contains(watcher);
+    }
+
+    public boolean contains(int watcherBit) {
+        if (watcherBits == null) {
+            return false;
+        }
+        return watcherBits.contains(watcherBit);
+    }
+
+    public int size() {
+        if (watchers != null) {
+            return watchers.size();
+        }
+        if (watcherBits != null) {
+            return watcherBits.size();
+        }
+        return 0;
+    }
+}

+ 1 - 1
src/java/main/org/apache/zookeeper/server/WatchesPathReport.java → src/java/main/org/apache/zookeeper/server/watch/WatchesPathReport.java

@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.server;
+package org.apache.zookeeper.server.watch;
 
 import java.util.Collections;
 import java.util.HashMap;

+ 1 - 1
src/java/main/org/apache/zookeeper/server/WatchesReport.java → src/java/main/org/apache/zookeeper/server/watch/WatchesReport.java

@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.server;
+package org.apache.zookeeper.server.watch;
 
 import java.util.Collections;
 import java.util.HashMap;

+ 1 - 1
src/java/main/org/apache/zookeeper/server/WatchesSummary.java → src/java/main/org/apache/zookeeper/server/watch/WatchesSummary.java

@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.server;
+package org.apache.zookeeper.server.watch;
 
 import java.util.LinkedHashMap;
 import java.util.Map;

+ 35 - 3
src/java/test/config/findbugsExcludeFile.xml

@@ -53,7 +53,13 @@
     <Method name="run" />
     <Bug pattern="DM_EXIT" />
   </Match>
-  
+
+   <!-- Failed to create watch manager is a unrecoverable error -->
+   <Match>
+     <Class name="org.apache.zookeeper.server.DataTree" />
+     <Bug pattern="DM_EXIT" />
+   </Match>
+
   <Match>
     <Package name="org.apache.jute.compiler.generated" />
   </Match>
@@ -85,7 +91,7 @@
 
   <Match>
     <Class name="org.apache.zookeeper.server.DataNode"/>
-      <Field name="children"/> 
+      <Field name="children"/>
       <Bug code="IS"/>
   </Match>
  <Match>
@@ -98,6 +104,15 @@
     <Field name="serverStats"/>
     <Bug code="IS"/>
   </Match>
+
+  <!-- The iterate function is non-thread safe, the caller will synchronize
+       on the BitHHashSet object -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.util.BitHashSet" />
+    <Field name="elementCount" />
+    <Bug code="IS" />
+  </Match>
+
   <Match>
      <Class name="org.apache.zookeeper.server.quorum.LearnerSessionTracker"/>
        <Bug code="UrF"/>
@@ -111,7 +126,7 @@
   <!-- these are old classes just for upgrading and should go away -->
   <Match>
     <Class name="org.apache.zookeeper.server.upgrade.DataNodeV1"/>
-  </Match> 
+  </Match>
 
   <Match>
     <Class name="org.apache.zookeeper.server.upgrade.DataTreeV1"/>
@@ -134,6 +149,23 @@
     </Or>
   </Match>
 
+  <!-- Synchronize on the AtomicInteger to do wait/notify, but not relying
+       on the synchronization to control the AtomicInteger value update,
+       so it's not a problem -->
+  <Match>
+    <Class name="org.apache.zookeeper.server.watch.WatcherCleaner" />
+    <Bug code="JLM" />
+    <Method name="addDeadWatcher" />
+  </Match>
+
+  <Match>
+    <Class name="org.apache.zookeeper.server.watch.WatcherCleaner$1" />
+    <Bug code="JLM" />
+    <Method name="doWork" />
+  </Match>
+
+
+
   <Match>
     <Class name="org.apache.zookeeper.server.quorum.QuorumPeer"/>
     <Bug pattern="OS_OPEN_STREAM" />

+ 110 - 0
src/java/test/org/apache/zookeeper/server/util/BitHashSetTest.java

@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitHashSetTest extends ZKTestCase {
+
+    @Test
+    public void testAddWatchBit() {
+        int watcherCacheSize = 1;
+        BitHashSet ws = new BitHashSet(watcherCacheSize);
+        Assert.assertTrue(ws.add(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        List<Integer> actualBits = new ArrayList<Integer>();
+
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {1},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        // add the same bit again
+        Assert.assertFalse(ws.add(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        // add another bit, make sure there there is only 1 bit cached
+        Assert.assertTrue(ws.add(2));
+        Assert.assertEquals(2, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        Assert.assertTrue(ws.contains(1));
+
+        actualBits.clear();
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {1, 2},
+            actualBits.toArray(new Integer[actualBits.size()]));
+    }
+
+    @Test
+    public void testRemoveWatchBit() {
+        int watcherCacheSize = 1;
+        BitHashSet ws = new BitHashSet(watcherCacheSize);
+        ws.add(1);
+        ws.add(2);
+
+        Assert.assertTrue(ws.contains(1));
+        Assert.assertTrue(ws.contains(2));
+
+        ws.remove(1);
+        Assert.assertFalse(ws.contains(1));
+        Assert.assertEquals(1, ws.size());
+        Assert.assertEquals(0, ws.cachedSize());
+
+        List<Integer> actualBits = new ArrayList<Integer>();
+
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {2},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        ws.add(3);
+        Assert.assertEquals(2, ws.size());
+        Assert.assertEquals(1, ws.cachedSize());
+
+        actualBits.clear();
+        for (int bit: ws) {
+            actualBits.add(bit);
+        }
+        Assert.assertArrayEquals(
+            new Integer[] {2, 3},
+            actualBits.toArray(new Integer[actualBits.size()]));
+
+        ws.remove(2);
+        ws.remove(3);
+
+        Assert.assertEquals(0, ws.size());
+        Assert.assertEquals(0, ws.cachedSize());
+    }
+}

+ 71 - 0
src/java/test/org/apache/zookeeper/server/util/BitMapTest.java

@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.util;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class BitMapTest extends ZKTestCase {
+
+    @Test
+    public void testAddAndRemove() {
+        BitMap<String> bitMap = new BitMap<String>();
+        String v1 = new String("v1");
+        Integer bit = bitMap.add(v1);
+
+        Assert.assertEquals(1, bitMap.size());
+        Assert.assertTrue(bit >= 0);
+        Assert.assertEquals(v1, bitMap.get(bit));
+        Assert.assertEquals(bit, bitMap.getBit(v1));
+
+        // add the same value again
+        Integer newBit = bitMap.add(v1);
+        Assert.assertEquals(bit, newBit);
+        Assert.assertEquals(1, bitMap.size());
+
+        String v2 = new String("v2");
+        Integer v2Bit = bitMap.add(v2);
+        Assert.assertEquals(2, bitMap.size());
+        Assert.assertNotEquals(v2Bit, bit);
+
+        // remove by value
+        bitMap.remove(v1);
+        Assert.assertEquals(1, bitMap.size());
+        Assert.assertNull(bitMap.get(bit));
+        Assert.assertNull(bitMap.getBit(v1));
+
+        // remove by bit
+        bitMap.remove(v2Bit);
+        Assert.assertEquals(0, bitMap.size());
+        Assert.assertNull(bitMap.get(v2Bit));
+        Assert.assertNull(bitMap.getBit(v2));
+    }
+
+    @Test
+    public void testBitReuse() {
+        BitMap<String> bitMap = new BitMap<String>();
+        int v1Bit = bitMap.add("v1");
+        int v2Bit = bitMap.add("v2");
+        int v3Bit = bitMap.add("v3");
+        bitMap.remove(v2Bit);
+
+        int v4Bit = bitMap.add("v4");
+
+        Assert.assertEquals(v4Bit, v2Bit);
+    }
+}

+ 404 - 0
src/java/test/org/apache/zookeeper/server/watch/WatchManagerTest.java

@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class WatchManagerTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class);
+
+    private static final String PATH_PREFIX = "path";
+
+    private ConcurrentHashMap<Integer, DumbWatcher> watchers;
+    private Random r;
+    private String className;
+
+    public WatchManagerTest(String className) {
+        this.className = className;
+    }
+
+    @Parameterized.Parameters
+    public static List<Object []> data() {
+        return Arrays.asList(new Object [][] {
+            {WatchManager.class.getName()},
+            {WatchManagerOptimized.class.getName()}
+        });
+    }
+
+    @Before
+    public void setUp() {
+        watchers = new ConcurrentHashMap<Integer, DumbWatcher>();
+        r = new Random(System.nanoTime());
+    }
+
+    public IWatchManager getWatchManager() throws IOException {
+        System.setProperty(WatchManagerFactory.ZOOKEEPER_WATCH_MANAGER_NAME, className);
+        return WatchManagerFactory.createWatchManager();
+    }
+
+    public DumbWatcher createOrGetWatcher(int watcherId) {
+        if (!watchers.containsKey(watcherId)) {
+            DumbWatcher watcher = new DumbWatcher(watcherId);
+            watchers.putIfAbsent(watcherId, watcher);
+        }
+        return watchers.get(watcherId);
+    }
+
+    public class AddWatcherWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final int watchers;
+        private final AtomicInteger watchesAdded;
+        private volatile boolean stopped = false;
+
+        public AddWatcherWorker(IWatchManager manager,
+                int paths, int watchers, AtomicInteger watchesAdded) {
+            this.manager = manager;
+            this.paths = paths;
+            this.watchers = watchers;
+            this.watchesAdded = watchesAdded;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                if (manager.addWatch(path, watcher)) {
+                    watchesAdded.addAndGet(1);
+                }
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+    }
+
+    public class WatcherTriggerWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final AtomicInteger triggeredCount;
+        private volatile boolean stopped = false;
+
+        public WatcherTriggerWorker(IWatchManager manager,
+                int paths, AtomicInteger triggeredCount) {
+            this.manager = manager;
+            this.paths = paths;
+            this.triggeredCount = triggeredCount;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                WatcherOrBitSet s = manager.triggerWatch(
+                        path, EventType.NodeDeleted);
+                if (s != null) {
+                    triggeredCount.addAndGet(s.size());
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+    }
+
+    public class RemoveWatcherWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int paths;
+        private final int watchers;
+        private final AtomicInteger watchesRemoved;
+        private volatile boolean stopped = false;
+
+        public RemoveWatcherWorker(IWatchManager manager,
+                int paths, int watchers, AtomicInteger watchesRemoved) {
+            this.manager = manager;
+            this.paths = paths;
+            this.watchers = watchers;
+            this.watchesRemoved = watchesRemoved;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                String path = PATH_PREFIX + r.nextInt(paths);
+                Watcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                if (manager.removeWatcher(path, watcher)) {
+                    watchesRemoved.addAndGet(1);
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+
+    }
+
+    public class CreateDeadWatchersWorker extends Thread {
+
+        private final IWatchManager manager;
+        private final int watchers;
+        private final Set<Watcher> removedWatchers;
+        private volatile boolean stopped = false;
+
+        public CreateDeadWatchersWorker(IWatchManager manager,
+                int watchers, Set<Watcher> removedWatchers) {
+            this.manager = manager;
+            this.watchers = watchers;
+            this.removedWatchers = removedWatchers;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                DumbWatcher watcher = createOrGetWatcher(r.nextInt(watchers));
+                watcher.setStale();
+                manager.removeWatcher(watcher);
+                synchronized (removedWatchers) {
+                    removedWatchers.add(watcher);
+                }
+                try {
+                    Thread.sleep(r.nextInt(10));
+                } catch (InterruptedException e) {}
+            }
+        }
+
+        public void shutdown() {
+            stopped = true;
+        }
+
+    }
+
+    /**
+     * Concurrently add and trigger watch, make sure the watches triggered
+     * are the same as the number added.
+     */
+    @Test(timeout = 90000)
+    public void testAddAndTriggerWatcher() throws IOException {
+        IWatchManager manager = getWatchManager();
+        int paths = 1;
+        int watchers = 10000;
+
+        // 1. start 5 workers to trigger watchers on that path
+        //    count all the watchers have been fired
+        AtomicInteger watchTriggered = new AtomicInteger();
+        List<WatcherTriggerWorker> triggerWorkers =
+                new ArrayList<WatcherTriggerWorker>();
+        for (int i = 0; i < 5; i++) {
+            WatcherTriggerWorker worker =
+                    new WatcherTriggerWorker(manager, paths, watchTriggered);
+            triggerWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on the same path
+        //    count all the watchers being added
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 100000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all the addWorkers
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. running the trigger worker a bit longer to make sure
+        //    all watchers added are fired
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {}
+
+        // 5. stop all triggerWorkers
+        for (WatcherTriggerWorker worker: triggerWorkers) {
+            worker.shutdown();
+        }
+
+        // 6. make sure the total watch triggered is same as added
+        Assert.assertTrue(watchesAdded.get() > 0);
+        Assert.assertEquals(watchesAdded.get(), watchTriggered.get());
+    }
+
+    /**
+     * Concurrently add and remove watch, make sure the watches left +
+     * the watches removed are equal to the total added watches.
+     */
+    @Test(timeout = 90000)
+    public void testRemoveWatcherOnPath() throws IOException {
+        IWatchManager manager = getWatchManager();
+        int paths = 10;
+        int watchers = 10000;
+
+        // 1. start 5 workers to remove watchers on those path
+        //    record the watchers have been removed
+        AtomicInteger watchesRemoved = new AtomicInteger();
+        List<RemoveWatcherWorker> removeWorkers =
+                new ArrayList<RemoveWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            RemoveWatcherWorker worker =
+                    new RemoveWatcherWorker(manager, paths, watchers, watchesRemoved);
+            removeWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on different path
+        //    record the watchers have been added
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 100000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all workers
+        for (RemoveWatcherWorker worker: removeWorkers) {
+            worker.shutdown();
+        }
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. sleep for a while to make sure all the thread exited
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {}
+
+        // 5. make sure left watches + removed watches = added watches
+        Assert.assertTrue(watchesAdded.get() > 0);
+        Assert.assertTrue(watchesRemoved.get() > 0);
+        Assert.assertTrue(manager.size() > 0);
+        Assert.assertEquals(
+                watchesAdded.get(), watchesRemoved.get() + manager.size());
+    }
+
+    /**
+     * Concurrently add watch while close the watcher to simulate the
+     * client connections closed on prod.
+     */
+    @Test(timeout = 90000)
+    public void testDeadWatchers() throws IOException {
+        System.setProperty("zookeeper.watcherCleanThreshold", "10");
+        System.setProperty("zookeeper.watcherCleanIntervalInSeconds", "1");
+
+        IWatchManager manager = getWatchManager();
+        int paths = 1;
+        int watchers = 100000;
+
+        // 1. start 5 workers to randomly mark those watcher as dead
+        //    and remove them from watch manager
+        Set<Watcher> deadWatchers = new HashSet<Watcher>();
+        List<CreateDeadWatchersWorker> deadWorkers =
+                new ArrayList<CreateDeadWatchersWorker>();
+        for (int i = 0; i < 5; i++) {
+            CreateDeadWatchersWorker worker = new CreateDeadWatchersWorker(
+                    manager, watchers, deadWatchers);
+            deadWorkers.add(worker);
+            worker.start();
+        }
+
+        // 2. start 5 workers to add different watchers on the same path
+        AtomicInteger watchesAdded = new AtomicInteger();
+        List<AddWatcherWorker> addWorkers = new ArrayList<AddWatcherWorker>();
+        for (int i = 0; i < 5; i++) {
+            AddWatcherWorker worker = new AddWatcherWorker(
+                    manager, paths, watchers, watchesAdded);
+            addWorkers.add(worker);
+            worker.start();
+        }
+
+        while (watchesAdded.get() < 50000) {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {}
+        }
+
+        // 3. stop all workers
+        for (CreateDeadWatchersWorker worker: deadWorkers) {
+            worker.shutdown();
+        }
+        for (AddWatcherWorker worker: addWorkers) {
+            worker.shutdown();
+        }
+
+        // 4. sleep for a while to make sure all the thread exited
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {}
+
+        // 5. make sure the dead watchers are not in the existing watchers
+        WatchesReport existingWatchers = manager.getWatches();
+        for (Watcher w: deadWatchers) {
+            Assert.assertFalse(
+                    existingWatchers.hasPaths(((ServerCnxn) w).getSessionId()));
+        }
+    }
+}

+ 127 - 0
src/java/test/org/apache/zookeeper/server/watch/WatcherCleanerTest.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.common.Time;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherCleanerTest extends ZKTestCase {
+
+    public static class MyDeadWatcherListener implements IDeadWatcherListener {
+
+        private CountDownLatch latch;
+        private int delayMs;
+        private Set<Integer> deadWatchers = new HashSet<Integer>();
+
+        public void setCountDownLatch(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        public void setDelayMs(int delayMs) {
+            this.delayMs = delayMs;
+        }
+
+        @Override
+        public void processDeadWatchers(Set<Integer> deadWatchers) {
+            if (delayMs > 0) {
+                try {
+                    Thread.sleep(delayMs);
+                } catch (InterruptedException e) {}
+            }
+            this.deadWatchers.clear();
+            this.deadWatchers.addAll(deadWatchers);
+            latch.countDown();
+        }
+
+        public Set<Integer> getDeadWatchers() {
+            return deadWatchers;
+        }
+
+        public boolean wait(int maxWaitMs) {
+            try {
+                return latch.await(maxWaitMs, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {}
+            return false;
+        }
+    }
+
+    @Test
+    public void testProcessDeadWatchersBasedOnThreshold() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        int threshold = 3;
+        WatcherCleaner cleaner = new WatcherCleaner(listener, threshold, 60, 1, 10);
+        cleaner.start();
+
+        int i = 0;
+        while (i++ < threshold - 1) {
+            cleaner.addDeadWatcher(i);
+        }
+        // not trigger processDeadWatchers yet
+        Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+        listener.setCountDownLatch(new CountDownLatch(1));
+        // add another dead watcher to trigger the process
+        cleaner.addDeadWatcher(i);
+        Assert.assertTrue(listener.wait(1000));
+        Assert.assertEquals(threshold, listener.getDeadWatchers().size());
+    }
+
+    @Test
+    public void testProcessDeadWatchersBasedOnTime() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 10, 1, 1, 10);
+        cleaner.start();
+
+        cleaner.addDeadWatcher(1);
+        // not trigger processDeadWatchers yet
+        Assert.assertEquals(0, listener.getDeadWatchers().size());
+
+        listener.setCountDownLatch(new CountDownLatch(1));
+        Assert.assertTrue(listener.wait(2000));
+        Assert.assertEquals(1, listener.getDeadWatchers().size());
+
+        // won't trigger event if there is no dead watchers
+        listener.setCountDownLatch(new CountDownLatch(1));
+        Assert.assertFalse(listener.wait(2000));
+    }
+
+    @Test
+    public void testMaxInProcessingDeadWatchers() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        int delayMs = 1000;
+        listener.setDelayMs(delayMs);
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 60, 1, 1);
+        cleaner.start();
+
+        listener.setCountDownLatch(new CountDownLatch(2));
+
+        long startTime = Time.currentElapsedTime();
+        cleaner.addDeadWatcher(1);
+        cleaner.addDeadWatcher(2);
+        long time = Time.currentElapsedTime() - startTime;
+        System.out.println("time used " + time);
+        Assert.assertTrue(Time.currentElapsedTime() - startTime >= delayMs);
+        Assert.assertTrue(listener.wait(5000));
+    }
+}

+ 61 - 0
src/java/test/org/apache/zookeeper/server/watch/WatcherOrBitSetTest.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.watch;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.DumbWatcher;
+import org.apache.zookeeper.server.util.BitHashSet;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class WatcherOrBitSetTest extends ZKTestCase {
+
+    @Test
+    public void testWatcherSet() {
+        Set<Watcher> wset = new HashSet<Watcher>();
+        WatcherOrBitSet hashSet = new WatcherOrBitSet(wset);
+        Assert.assertEquals(0, hashSet.size());
+
+        DumbWatcher w1 = new DumbWatcher();
+        Assert.assertFalse(hashSet.contains(w1));
+        wset.add(w1);
+        Assert.assertTrue(hashSet.contains(w1));
+        Assert.assertEquals(1, hashSet.size());
+        Assert.assertFalse(hashSet.contains(1));
+    }
+
+    @Test
+    public void testBitSet() {
+        BitHashSet bset = new BitHashSet(0);
+        WatcherOrBitSet bitSet = new WatcherOrBitSet(bset);
+        Assert.assertEquals(0, bitSet.size());
+
+        Integer bit = new Integer(1);
+        Assert.assertFalse(bitSet.contains(1));
+        Assert.assertFalse(bitSet.contains(bit));
+
+        bset.add(bit);
+        Assert.assertTrue(bitSet.contains(1));
+        Assert.assertTrue(bitSet.contains(bit));
+        Assert.assertEquals(1, bitSet.size());
+    }
+}

+ 1 - 1
src/java/test/org/apache/zookeeper/server/WatchesPathReportTest.java → src/java/test/org/apache/zookeeper/server/watch/WatchesPathReportTest.java

@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zookeeper.server;
+package org.apache.zookeeper.server.watch;
 
 import java.util.HashMap;
 import java.util.HashSet;

+ 1 - 1
src/java/test/org/apache/zookeeper/server/WatchesReportTest.java → src/java/test/org/apache/zookeeper/server/watch/WatchesReportTest.java

@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zookeeper.server;
+package org.apache.zookeeper.server.watch;
 
 import java.util.HashMap;
 import java.util.HashSet;

+ 1 - 1
src/java/test/org/apache/zookeeper/server/WatchesSummaryTest.java → src/java/test/org/apache/zookeeper/server/watch/WatchesSummaryTest.java

@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.zookeeper.server;
+package org.apache.zookeeper.server.watch;
 
 import java.util.Map;
 import org.apache.zookeeper.ZKTestCase;

+ 30 - 0
src/test/java/bench/org/apache/zookeeper/BenchMain.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+public class BenchMain {
+    public static void main(String args[]) throws Exception {
+        org.openjdk.jmh.Main.main(args);
+    }
+}

+ 300 - 0
src/test/java/bench/org/apache/zookeeper/server/watch/WatchBench.java

@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.watch;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.watch.IWatchManager;
+import org.apache.zookeeper.server.DumbWatcher;
+
+import org.openjdk.jmh.annotations.*;
+
+import java.util.concurrent.TimeUnit;
+
+@Fork(3)
+public class WatchBench {
+
+    static final String pathPrefix = "/reasonably/long/path/";
+    static final EventType event = EventType.NodeDataChanged;
+
+    static IWatchManager createWatchManager(String className) throws Exception {
+        Class clazz = Class.forName(
+                "org.apache.zookeeper.server.watch." + className);
+        return (IWatchManager) clazz.newInstance();
+    }
+
+    static void forceGC() {
+        int gcTimes = 3;
+        for (int i = 0; i < gcTimes; i++) {
+            try {
+                System.gc();
+                Thread.currentThread().sleep(1000);
+
+                System.runFinalization();
+                Thread.currentThread().sleep(1000);
+            } catch (InterruptedException ex) { /* ignore */ }
+        }
+    }
+
+    static long getMemoryUse() {
+        forceGC();
+        long totalMem = Runtime.getRuntime().totalMemory();
+
+        forceGC();
+        long freeMem = Runtime.getRuntime().freeMemory();
+        return totalMem - freeMem;
+    }
+
+    @State(Scope.Benchmark)
+    public static class IterationState {
+
+        @Param({"WatchManager", "WatchManagerOptimized"})
+        public String watchManagerClass;
+
+        @Param({"10000"})
+        public int pathCount;
+
+        String[] paths;
+
+        long watchesAdded = 0;
+        IWatchManager watchManager;
+
+        long memWhenSetup = 0;
+
+        @Setup(Level.Iteration)
+        public void setup() throws Exception {
+            paths = new String[pathCount];
+            for (int i = 0; i < paths.length; i++) {
+                paths[i] = pathPrefix + i;
+            }
+
+            watchesAdded = 0;
+            watchManager = createWatchManager(watchManagerClass);
+
+            memWhenSetup = getMemoryUse();
+        }
+
+        @TearDown(Level.Iteration)
+        public void tearDown() {
+            long memUsed = getMemoryUse() - memWhenSetup;
+            System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+            double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+            System.out.println(
+                    "Memory used per million watches " +
+                    String.format("%.2f", memPerMillionWatchesMB) + "MB");
+        }
+    }
+
+    /**
+     * Test concenrate watch case where the watcher watches all paths.
+     *
+     * The output of this test will be the average time used to add the
+     * watch to all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testAddConcentrateWatch(IterationState state) throws Exception {
+        Watcher watcher = new DumbWatcher();
+
+        // watch all paths
+        for (String path : state.paths) {
+            if (state.watchManager.addWatch(path, watcher)) {
+                state.watchesAdded++;
+            }
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class InvocationState {
+
+        @Param({"WatchManager", "WatchManagerOptimized"})
+        public String watchManagerClass;
+
+        @Param({"1", "1000"})
+        public int pathCount;
+
+        @Param({"1", "1000"})
+        public int watcherCount;
+
+        String[] paths;
+        Watcher[] watchers;
+
+        IWatchManager watchManager;
+
+        @Setup(Level.Invocation)
+        public void setup() throws Exception {
+            initialize();
+            prepare();
+        }
+
+        void initialize() throws Exception {
+            if (paths == null || paths.length != pathCount) {
+                paths = new String[pathCount];
+                for (int i = 0; i < pathCount; i++) {
+                    paths[i] = pathPrefix + i;
+                }
+            }
+
+            if (watchers == null || watchers.length != watcherCount) {
+                watchers = new Watcher[watcherCount];
+                for (int i = 0; i < watcherCount; i++) {
+                    watchers[i] = new DumbWatcher();
+                }
+            }
+            if (watchManager == null ||
+                    !watchManager.getClass().getSimpleName().contains(
+                            watchManagerClass)) {
+                watchManager = createWatchManager(watchManagerClass);
+            }
+        }
+
+        void prepare() {
+            for (String path : paths) {
+                for (Watcher watcher : watchers) {
+                    watchManager.addWatch(path, watcher);
+                }
+            }
+        }
+    }
+
+    /**
+     * Test trigger watches in concenrate case.
+     *
+     * The output of this test is the time used to trigger those watches on
+     * all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
+        for (String path : state.paths) {
+            state.watchManager.triggerWatch(path, event);
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class AddSparseWatchState extends InvocationState {
+
+        @Param({"10000"})
+        public int pathCount;
+
+        @Param({"10000"})
+        public int watcherCount;
+
+        long watchesAdded = 0;
+        long memWhenSetup = 0;
+
+        @Override
+        public void prepare() {
+            watchesAdded = 0;
+            memWhenSetup = getMemoryUse();
+        }
+
+        @TearDown(Level.Invocation)
+        public void tearDown() {
+            long memUsed = getMemoryUse() - memWhenSetup;
+            System.out.println("Memory used: " + watchesAdded + " " + memUsed);
+
+            double memPerMillionWatchesMB = memUsed * 1.0 / watchesAdded ;
+            System.out.println(
+                    "Memory used per million sparse watches " +
+                    String.format("%.2f", memPerMillionWatchesMB) + "MB");
+
+            // clear all the watches
+            for (String path : paths) {
+                watchManager.triggerWatch(path, event);
+            }
+        }
+    }
+
+    /**
+     * Test sparse watch case where only one watcher watches all paths, and
+     * only one path being watched by all watchers.
+     *
+     * The output of this test will be the average time used to add those
+     * sparse watches.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testAddSparseWatch(AddSparseWatchState state) throws Exception {
+        // All watchers are watching the 1st path
+        for (Watcher watcher : state.watchers) {
+            if (state.watchManager.addWatch(state.paths[0], watcher)) {
+                state.watchesAdded++;
+            }
+        }
+        // The 1st watcher is watching all paths
+        for (String path : state.paths) {
+            if (state.watchManager.addWatch(path, state.watchers[0])) {
+                state.watchesAdded++;
+            }
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class TriggerSparseWatchState extends InvocationState {
+
+        @Param({"10000"})
+        public int pathCount;
+
+        @Param({"10000"})
+        public int watcherCount;
+
+        @Override
+        public void prepare() {
+            // All watchers are watching the 1st path
+            for (Watcher watcher : watchers) {
+                watchManager.addWatch(paths[0], watcher);
+            }
+
+            // The 1st watcher is watching all paths
+            for (String path : paths) {
+                watchManager.addWatch(path, watchers[0]);
+            }
+        }
+    }
+
+
+    /**
+     * Test trigger watches in sparse case.
+     *
+     * The output of this test is the time used to trigger those watches on
+     * all paths.
+     */
+    @Benchmark
+    @BenchmarkMode(Mode.AverageTime)
+    @OutputTimeUnit(TimeUnit.MILLISECONDS)
+    @Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
+    @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
+    public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
+        for (String path : state.paths) {
+            state.watchManager.triggerWatch(path, event);
+        }
+    }
+}

+ 1 - 0
zookeeper-contrib/zookeeper-contrib-fatjar/src/main/resources/mainClasses

@@ -8,3 +8,4 @@ quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A benchmark of just the
 abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A benchmark of just the atomic broadcast
 ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will instantiate classes as directed by an instance manager
 systest:org.apache.zookeeper.test.system.BaseSysTest:Start system test
+jmh:org.apache.zookeeper.BenchMain:Run jmh micro benchmarks

+ 96 - 0
zookeeper-docs/src/documentation/content/xdocs/zookeeperAdmin.xml

@@ -1025,6 +1025,102 @@ server.3=zoo3:2888:3888</programlisting>
             </listitem>
           </varlistentry>
 
+
+          <varlistentry>
+            <term>watchManaggerName</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watchManagerName</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> New watcher
+                manager WatchManagerOptimized is added to optimize the memory overhead in heavy watch use cases. This
+                config is used to define which watcher manager to be used. Currently, we only support WatchManager and
+                WatchManagerOptimized.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanThreadsNum</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watcherCleanThreadsNum</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers lazily, this config is used to decide how
+                many thread is used in the WatcherCleaner. More thread usually means larger clean up throughput. The
+                default value is 2, which is good enough even for heavy and continuous session closing/recreating cases.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanThreshold</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watcherCleanThreshold</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
+                heavy, batch processing will reduce the cost and improve the performance. This setting is used to decide
+                the batch size. The default one is 1000, we don't need to change it if there is no memory or clean up
+                speed issue.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>watcherCleanIntervalInSeconds</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.watcherCleanIntervalInSeconds</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> The new watcher
+                manager WatchManagerOptimized will clean up the dead watchers lazily, the clean up process is relatively
+                heavy, batch processing will reduce the cost and improve the performance. Besides watcherCleanThreshold,
+                this setting is used to clean up the dead watchers after certain time even the dead watchers are not larger
+                than watcherCleanThreshold, so that we won't leave the dead watchers there for too long. The default setting
+                is 10 minutes, which usually don't need to be changed.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>maxInProcessingDeadWatchers</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.maxInProcessingDeadWatchers</emphasis>)</para>
+
+              <para><emphasis role="bold">New in 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is used
+                to control how many backlog can we have in the WatcherCleaner, when it reaches this number, it will
+                slow down adding the dead watcher to WatcherCleaner, which will in turn slow down adding and closing
+                watchers, so that we can avoid OOM issue. By default there is no limit, you can set it to values like
+                watcherCleanThreshold * 1000.</para>
+            </listitem>
+          </varlistentry>
+
+          <varlistentry>
+            <term>bitHashCacheSize</term>
+
+            <listitem>
+              <para>(Java system property only: <emphasis
+                    role="bold">zookeeper.bitHashCacheSize</emphasis>)</para>
+
+              <para><emphasis role="bold">New 3.6.0:</emphasis> Added in
+                <ulink url="https://issues.apache.org/jira/browse/ZOOKEEPER-1179">ZOOKEEPER-1179</ulink> This is the
+                setting used to decide the HashSet cache size in the BitHashSet implementation. Without HashSet, we
+                need to use O(N) time to get the elements, N is the bit numbers in elementBits. But we need to
+                keep the size small to make sure it doesn't cost too much in memory, there is a trade off between memory
+                and time complexity. The default value is 10, which seems a relatively reasonable cache size.</para>
+            </listitem>
+          </varlistentry>
+
         </variablelist>
       </section>