Explorar o código

ZOOKEEPER-3267: Add watcher metrics

Author: Jie Huang <jiehuang@fb.com>

Reviewers: fangmin@apache.org, andor@apache.org

Closes #796 from jhuan31/ZOOKEEPER-3267 and squashes the following commits:

4dafb7e94 [Jie Huang] refactor/de-flaky unit tests
9bdc93a04 [Jie Huang] fix ADD_DEAD_WATCHERS_STALL_TIME measurement
8e45fc253 [Jie Huang] add unit tests for the watcher metrics
2d786e455 [Jie Huang] ZOOKEEPER-3267: Add watcher metrics
Jie Huang %!s(int64=6) %!d(string=hai) anos
pai
achega
dafc40d59f

+ 17 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -76,6 +76,23 @@ public enum ServerMetrics {
 
     BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")),
 
+    /**
+     * Fired watcher stats.
+     */
+    NODE_CREATED_WATCHER(new AvgMinMaxCounter("node_created_watch_count")),
+    NODE_DELETED_WATCHER(new AvgMinMaxCounter("node_deleted_watch_count")),
+    NODE_CHANGED_WATCHER(new AvgMinMaxCounter("node_changed_watch_count")),
+    NODE_CHILDREN_WATCHER(new AvgMinMaxCounter("node_children_watch_count")),
+
+
+    /*
+     * Number of dead watchers in DeadWatcherListener
+     */
+    ADD_DEAD_WATCHER_STALL_TIME(new SimpleCounter("add_dead_watcher_stall_time")),
+    DEAD_WATCHERS_QUEUED(new SimpleCounter("dead_watchers_queued")),
+    DEAD_WATCHERS_CLEARED(new SimpleCounter("dead_watchers_cleared")),
+    DEAD_WATCHERS_CLEANER_LATENCY(new AvgMinMaxPercentileCounter("dead_watchers_cleaner_latency")),
+
     RESPONSE_PACKET_CACHE_HITS(new SimpleCounter("response_packet_cache_hits")),
     RESPONSE_PACKET_CACHE_MISSING(new SimpleCounter("response_packet_cache_misses")),
     

+ 23 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java

@@ -31,6 +31,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooTrace;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,6 +139,28 @@ public class WatchManager implements IWatchManager {
             }
             w.process(e);
         }
+
+        switch (type) {
+        case NodeCreated:
+            ServerMetrics.NODE_CREATED_WATCHER.add(watchers.size());
+            break;
+
+        case NodeDeleted:
+            ServerMetrics.NODE_DELETED_WATCHER.add(watchers.size());
+            break;
+
+        case NodeDataChanged:
+            ServerMetrics.NODE_CHANGED_WATCHER.add(watchers.size());
+            break;
+
+        case NodeChildrenChanged:
+            ServerMetrics.NODE_CHILDREN_WATCHER.add(watchers.size());
+            break;
+        default:
+            // Other types not logged.
+            break;
+        }
+
         return new WatcherOrBitSet(watchers);
     }
 

+ 28 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java

@@ -33,6 +33,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.util.BitHashSet;
 import org.apache.zookeeper.server.util.BitMap;
 import org.slf4j.Logger;
@@ -222,6 +223,8 @@ public class WatchManagerOptimized
             return null;
         }
 
+        int triggeredWatches = 0;
+
         // Avoid race condition between dead watcher cleaner in
         // WatcherCleaner and iterating here
         synchronized (watchers) {
@@ -238,9 +241,11 @@ public class WatchManagerOptimized
                 }
 
                 w.process(e);
+                triggeredWatches++;
             }
         }
 
+        updateMetrics(type, triggeredWatches);
         return new WatcherOrBitSet(watchers);
     }
 
@@ -269,6 +274,29 @@ public class WatchManagerOptimized
         }
     }
 
+    void updateMetrics(final EventType type, int size) {
+        switch (type) {
+        case NodeCreated:
+            ServerMetrics.NODE_CREATED_WATCHER.add(size);
+            break;
+
+        case NodeDeleted:
+            ServerMetrics.NODE_DELETED_WATCHER.add(size);
+            break;
+
+        case NodeDataChanged:
+            ServerMetrics.NODE_CHANGED_WATCHER.add(size);
+            break;
+
+        case NodeChildrenChanged:
+            ServerMetrics.NODE_CHILDREN_WATCHER.add(size);
+            break;
+        default:
+            // Other types not logged.
+            break;
+        }
+    }
+
     boolean isDeadWatcher(Watcher watcher) {
         return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale();
     }

+ 7 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java

@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.server.RateLogger;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.WorkerService;
 import org.apache.zookeeper.server.WorkerService.WorkRequest;
 
@@ -103,9 +104,12 @@ public class WatcherCleaner extends Thread {
                 totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
             try {
                 RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
+                long startTime = Time.currentElapsedTime();
                 synchronized(processingCompletedEvent) {
                     processingCompletedEvent.wait(100);
                 }
+                long latency = Time.currentElapsedTime() - startTime;
+                ServerMetrics.ADD_DEAD_WATCHER_STALL_TIME.add(latency);
             } catch (InterruptedException e) {
                 LOG.info("Got interrupted while waiting for dead watches " +
                         "queue size");
@@ -115,6 +119,7 @@ public class WatcherCleaner extends Thread {
         synchronized (this) {
             if (deadWatchers.add(watcherBit)) {
                 totalDeadWatchers.incrementAndGet();
+                ServerMetrics.DEAD_WATCHERS_QUEUED.add(1);
                 if (deadWatchers.size() >= watcherCleanThreshold) {
                     synchronized (cleanEvent) {
                         cleanEvent.notifyAll();
@@ -164,6 +169,8 @@ public class WatcherCleaner extends Thread {
                         listener.processDeadWatchers(snapshot);
                         long latency = Time.currentElapsedTime() - startTime;
                         LOG.info("Takes {} to process {} watches", latency, total);
+                        ServerMetrics.DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
+                        ServerMetrics.DEAD_WATCHERS_CLEARED.add(total);
                         totalDeadWatchers.addAndGet(-total);
                         synchronized(processingCompletedEvent) {
                             processingCompletedEvent.notifyAll();

+ 70 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java

@@ -33,6 +33,10 @@ import org.apache.zookeeper.server.DumbWatcher;
 import org.apache.zookeeper.server.ServerCnxn;
 
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.server.metric.AvgMinMaxCounter;
+import org.apache.zookeeper.server.metric.Metric;
+import org.eclipse.jetty.util.IO;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -390,8 +394,10 @@ public class WatchManagerTest extends ZKTestCase {
         }
 
         // 4. sleep for a while to make sure all the thread exited
+        // the cleaner may wait as long as CleanerInterval+CleanerInterval/2+1
+        // So need to sleep as least that long
         try {
-            Thread.sleep(1000);
+            Thread.sleep(2000);
         } catch (InterruptedException e) {}
 
         // 5. make sure the dead watchers are not in the existing watchers
@@ -401,4 +407,67 @@ public class WatchManagerTest extends ZKTestCase {
                     existingWatchers.hasPaths(((ServerCnxn) w).getSessionId()));
         }
     }
+
+    private void checkMetrics(String metricName, long min, long max, double avg, long cnt, long sum){
+        Map<String, Object> values = ServerMetrics.getAllValues();
+
+        Assert.assertEquals(min, values.get("min_" + metricName));
+        Assert.assertEquals(max, values.get("max_" + metricName));
+        Assert.assertEquals(avg, (Double)values.get("avg_" + metricName), 0.000001);
+        Assert.assertEquals(cnt, values.get("cnt_" + metricName));
+        Assert.assertEquals(sum, values.get("sum_" + metricName));
+    }
+
+    @Test
+    public void testWatcherMetrics() throws IOException {
+        IWatchManager manager = getWatchManager();
+        ServerMetrics.resetAll();
+
+        DumbWatcher watcher1 = new DumbWatcher(1);
+        DumbWatcher watcher2 = new DumbWatcher(2);
+
+        final String path1 = "/path1";
+        final String path2 = "/path2";
+
+        final String path3 = "/path3";
+
+        //both wather1 and wather2 are watching path1
+        manager.addWatch(path1, watcher1);
+        manager.addWatch(path1, watcher2);
+
+        //path2 is watched by watcher1
+        manager.addWatch(path2, watcher1);
+
+        manager.triggerWatch(path3, EventType.NodeCreated);
+        //path3 is not being watched so metric is 0
+        checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L);
+
+        //path1 is watched by two watchers so two fired
+        manager.triggerWatch(path1, EventType.NodeCreated);
+        checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L);
+
+        //path2 is watched by one watcher so one fired now total is 3
+        manager.triggerWatch(path2, EventType.NodeCreated);
+        checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
+
+        //watches on path1 are no longer there so zero fired
+        manager.triggerWatch(path1, EventType.NodeDataChanged);
+        checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L);
+
+        //both wather1 and wather2 are watching path1
+        manager.addWatch(path1, watcher1);
+        manager.addWatch(path1, watcher2);
+
+        //path2 is watched by watcher1
+        manager.addWatch(path2, watcher1);
+
+        manager.triggerWatch(path1, EventType.NodeDataChanged);
+        checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L);
+
+        manager.triggerWatch(path2, EventType.NodeDeleted);
+        checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L);
+
+        //make sure that node created watch count is not impacted by the fire of other event types
+        checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
+    }
 }

+ 41 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java

@@ -16,6 +16,7 @@
  */
 package org.apache.zookeeper.server.watch;
 
+import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.CountDownLatch;
@@ -23,10 +24,16 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.junit.Test;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
 
 public class WatcherCleanerTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(WatcherCleanerTest.class);
 
     public static class MyDeadWatcherListener implements IDeadWatcherListener {
 
@@ -124,4 +131,38 @@ public class WatcherCleanerTest extends ZKTestCase {
         Assert.assertTrue(Time.currentElapsedTime() - startTime >= delayMs);
         Assert.assertTrue(listener.wait(5000));
     }
+
+    @Test
+    public void testDeadWatcherMetrics() {
+        MyDeadWatcherListener listener = new MyDeadWatcherListener();
+        WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 1, 1, 1);
+        listener.setDelayMs(20);
+        cleaner.start();
+        listener.setCountDownLatch(new CountDownLatch(3));
+        //the dead watchers will be added one by one and cleared one by one because we set both watchCleanThreshold and
+        //maxInProcessingDeadWatchers to 1
+        cleaner.addDeadWatcher(1);
+        cleaner.addDeadWatcher(2);
+        cleaner.addDeadWatcher(3);
+
+        Assert.assertTrue(listener.wait(5000));
+
+        Map<String, Object> values = ServerMetrics.getAllValues();
+
+        Assert.assertThat("Adding dead watcher should be stalled twice",
+                          (Long)values.get("add_dead_watcher_stall_time"),
+                           greaterThan(0L));
+        Assert.assertEquals("Total dead watchers added to the queue should be 3", 3L, values.get("dead_watchers_queued"));
+        Assert.assertEquals("Total dead watchers cleared should be 3", 3L, values.get("dead_watchers_cleared"));
+
+        Assert.assertEquals(3L, values.get("cnt_dead_watchers_cleaner_latency"));
+
+        //Each latency should be a little over 20 ms, allow 5 ms deviation
+        Assert.assertEquals(20D, (Double)values.get("avg_dead_watchers_cleaner_latency"), 5);
+        Assert.assertEquals(20D, ((Long)values.get("min_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("max_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("p50_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("p95_dead_watchers_cleaner_latency")).doubleValue(), 5);
+        Assert.assertEquals(20D, ((Long)values.get("p99_dead_watchers_cleaner_latency")).doubleValue(), 5);
+    }
 }