Sfoglia il codice sorgente

ZOOKEEPER-4655: Communicate the Zxid that triggered a WatchEvent to fire (#1950)

* Fix a race condition in WatcherCleanerTest.testDeadWatcherMetrics

Because the metrics were updated _after_ the listener is invoked, the listener does not always see
the fresh metric value. This fixes it so that the test waits for the value to become what we expect.

* Leverage an existing method and refactor the rest of the code to match

Since there was an existing waitFor method in ZKTestCase, along with an existing implementation of a
waitForMetric LearnerMetricsTest, this commit moves waitForMetric to ZKTestCase and refactors the
metric-related usages of waitFor.

* Communicate the Zxid that triggered a WatchEvent to fire

With the recent addition of persistent watches, many doors have opened up to significantly more
performant and intuitive local caches of remote state, but the actual implementation can be
difficult because to cache data locally, one needs to execute the following steps:

1. Set the watch
2. Bootstrap the watched subtree
3. Catch up on the events that fired during the bootstrap

The issue is it's now very difficult to deduplicate and sanely resolve the remote state during step
3 because it's unknown whether an event arrived during the bootstrap or after. For example,
imagine that between steps 1 and 2, a node /a was deleted then re-created. By the time step 3 is
executed, there will be a NodeDeleted event queued up followed by a NodeCreated, causing at best a
double read (one from the bootstrap, one from the NodeCreated) or at worst some data inconsistencies
in the local cache.

This change sets the Zxid in the response header whenever the watch event type is NodeCreated,
NodeDeleted, NodeDataChanged or NodeChildrenChanged.
Paul Chesnais 1 anno fa
parent
commit
880f606418
22 ha cambiato i file con 360 aggiunte e 205 eliminazioni
  1. 3 3
      zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java
  2. 1 1
      zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
  3. 32 7
      zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java
  4. 5 0
      zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java
  5. 6 6
      zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
  6. 19 0
      zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
  7. 1 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
  8. 1 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
  9. 4 2
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java
  10. 4 4
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java
  11. 4 4
      zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java
  12. 40 2
      zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
  13. 5 11
      zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
  14. 1 2
      zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java
  15. 0 15
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java
  16. 28 9
      zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java
  17. 16 16
      zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java
  18. 82 52
      zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
  19. 14 0
      zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java
  20. 2 2
      zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java
  21. 2 2
      zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java
  22. 90 65
      zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java

+ 3 - 3
zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java

@@ -191,7 +191,7 @@ public class WatchBench {
     @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
     public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
         for (String path : state.paths) {
-            state.watchManager.triggerWatch(path, event);
+            state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
         }
     }
 
@@ -225,7 +225,7 @@ public class WatchBench {
 
             // clear all the watches
             for (String path : paths) {
-                watchManager.triggerWatch(path, event);
+                watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
             }
         }
     }
@@ -294,7 +294,7 @@ public class WatchBench {
     @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
     public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
         for (String path : state.paths) {
-            state.watchManager.triggerWatch(path, event);
+            state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
         }
     }
 }

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -902,7 +902,7 @@ public class ClientCnxn {
                     event.setPath(clientPath);
                 }
 
-                WatchedEvent we = new WatchedEvent(event);
+                WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid());
                 LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                 eventThread.queueEvent(we);
                 return;

+ 32 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java

@@ -31,27 +31,38 @@ import org.apache.zookeeper.proto.WatcherEvent;
  */
 @InterfaceAudience.Public
 public class WatchedEvent {
+    public static final long NO_ZXID = -1L;
 
     private final KeeperState keeperState;
     private final EventType eventType;
-    private String path;
+    private final String path;
+    private final long zxid;
 
     /**
-     * Create a WatchedEvent with specified type, state and path
+     * Create a WatchedEvent with specified type, state, path and zxid
      */
-    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
+    public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) {
         this.keeperState = keeperState;
         this.eventType = eventType;
         this.path = path;
+        this.zxid = zxid;
     }
 
     /**
-     * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent
+     * Create a WatchedEvent with specified type, state and path
      */
-    public WatchedEvent(WatcherEvent eventMessage) {
+    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
+        this(eventType, keeperState, path, NO_ZXID);
+    }
+
+    /**
+     * Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent
+     */
+    public WatchedEvent(WatcherEvent eventMessage, long zxid) {
         keeperState = KeeperState.fromInt(eventMessage.getState());
         eventType = EventType.fromInt(eventMessage.getType());
         path = eventMessage.getPath();
+        this.zxid = zxid;
     }
 
     public KeeperState getState() {
@@ -66,9 +77,24 @@ public class WatchedEvent {
         return path;
     }
 
+    /**
+     * Returns the zxid of the transaction that triggered this watch if it is
+     * of one of the following types:<ul>
+     *   <li>{@link EventType#NodeCreated}</li>
+     *   <li>{@link EventType#NodeDeleted}</li>
+     *   <li>{@link EventType#NodeDataChanged}</li>
+     *   <li>{@link EventType#NodeChildrenChanged}</li>
+     * </ul>
+     * Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also
+     * returned by old servers that do not support this feature.
+     */
+    public long getZxid() {
+        return zxid;
+    }
+
     @Override
     public String toString() {
-        return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path;
+        return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid;
     }
 
     /**
@@ -77,5 +103,4 @@ public class WatchedEvent {
     public WatcherEvent getWrapper() {
         return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
     }
-
 }

+ 5 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java

@@ -26,6 +26,11 @@ import org.apache.yetus.audience.InterfaceAudience;
  * server it connects to. An application using such a client handles these
  * events by registering a callback object with the client. The callback object
  * is expected to be an instance of a class that implements Watcher interface.
+ * When {@link #process} is triggered by a watch firing, such as
+ * {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will
+ * return the zxid of the transaction that caused said watch to fire. If
+ * {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to
+ * support this feature.
  *
  */
 @InterfaceAudience.Public

+ 6 - 6
zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java

@@ -518,8 +518,8 @@ public class DataTree {
             updateQuotaStat(lastPrefix, bytes, 1);
         }
         updateWriteStat(path, bytes);
-        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
-        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
+        dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
+        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
     }
 
     /**
@@ -615,9 +615,9 @@ public class DataTree {
                 "childWatches.triggerWatch " + parentName);
         }
 
-        WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);
-        childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
-        childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
+        WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
+        childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
+        childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
     }
 
     public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
@@ -649,7 +649,7 @@ public class DataTree {
         nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData));
 
         updateWriteStat(path, dataBytes);
-        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
+        dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
         return s;
     }
 

+ 19 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java

@@ -33,6 +33,9 @@ import org.apache.zookeeper.proto.ReplyHeader;
 public class DumbWatcher extends ServerCnxn {
 
     private long sessionId;
+    private String mostRecentPath;
+    private Event.EventType mostRecentEventType;
+    private long mostRecentZxid = WatchedEvent.NO_ZXID;
 
     public DumbWatcher() {
         this(0);
@@ -49,8 +52,24 @@ public class DumbWatcher extends ServerCnxn {
 
     @Override
     public void process(WatchedEvent event) {
+        mostRecentEventType = event.getType();
+        mostRecentZxid = event.getZxid();
+        mostRecentPath = event.getPath();
     }
 
+    public String getMostRecentPath() {
+        return mostRecentPath;
+    }
+
+    public Event.EventType getMostRecentEventType() {
+        return mostRecentEventType;
+    }
+
+    public long getMostRecentZxid() {
+        return mostRecentZxid;
+    }
+
+
     @Override
     int getSessionTimeout() {
         return 0;

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -705,7 +705,7 @@ public class NIOServerCnxn extends ServerCnxn {
      */
     @Override
     public void process(WatchedEvent event) {
-        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
+        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(
                 LOG,

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -162,7 +162,7 @@ public class NettyServerCnxn extends ServerCnxn {
 
     @Override
     public void process(WatchedEvent event) {
-        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
+        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(
                 LOG,

+ 4 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java

@@ -113,10 +113,11 @@ public interface IWatchManager {
      *
      * @param path znode path
      * @param type the watch event type
+     * @param zxid the zxid for the corresponding change that triggered this event
      *
      * @return the watchers have been notified
      */
-    WatcherOrBitSet triggerWatch(String path, EventType type);
+    WatcherOrBitSet triggerWatch(String path, EventType type, long zxid);
 
     /**
      * Distribute the watch event for the given path, but ignore those
@@ -124,11 +125,12 @@ public interface IWatchManager {
      *
      * @param path znode path
      * @param type the watch event type
+     * @param zxid the zxid for the corresponding change that triggered this event
      * @param suppress the suppressed watcher set
      *
      * @return the watchers have been notified
      */
-    WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress);
+    WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress);
 
     /**
      * Get the size of watchers.

+ 4 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java

@@ -129,13 +129,13 @@ public class WatchManager implements IWatchManager {
     }
 
     @Override
-    public WatcherOrBitSet triggerWatch(String path, EventType type) {
-        return triggerWatch(path, type, null);
+    public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
+        return triggerWatch(path, type, zxid, null);
     }
 
     @Override
-    public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
-        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
+    public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
+        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
         Set<Watcher> watchers = new HashSet<>();
         synchronized (this) {
             PathParentIterator pathParentIterator = getPathParentIterator(path);

+ 4 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java

@@ -202,13 +202,13 @@ public class WatchManagerOptimized implements IWatchManager, IDeadWatcherListene
     }
 
     @Override
-    public WatcherOrBitSet triggerWatch(String path, EventType type) {
-        return triggerWatch(path, type, null);
+    public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
+        return triggerWatch(path, type, zxid, null);
     }
 
     @Override
-    public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
-        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
+    public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) {
+        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
 
         BitHashSet watchers = remove(path);
         if (watchers == null) {

+ 40 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java

@@ -22,7 +22,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
 import java.time.Instant;
+import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.util.ServiceUtils;
+import org.hamcrest.CustomMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.StringDescription;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -40,6 +45,7 @@ public class ZKTestCase {
 
     protected static final File testBaseDir = new File(System.getProperty("build.test.dir", "build"));
     private static final Logger LOG = LoggerFactory.getLogger(ZKTestCase.class);
+    public static final int DEFAULT_METRIC_TIMEOUT = 30;
 
     static {
         // Disable System.exit in tests.
@@ -103,7 +109,7 @@ public class ZKTestCase {
      * @param timeout   timeout in seconds
      * @throws InterruptedException
      */
-    public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
+    public static void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
         final Instant deadline = Instant.now().plusSeconds(timeout);
         while (Instant.now().isBefore(deadline)) {
             if (condition.evaluate()) {
@@ -114,4 +120,36 @@ public class ZKTestCase {
         fail(msg);
     }
 
-}
+    public static <T> void waitForMetric(String metricKey, Matcher<T> matcher) throws InterruptedException {
+        waitForMetric(metricKey, matcher, DEFAULT_METRIC_TIMEOUT);
+    }
+
+    public static <T> void waitForMetric(String metricKey, Matcher<T> matcher, int timeoutInSeconds) throws InterruptedException {
+        String errorMessage = String.format("metric \"%s\" failed to match after %d seconds",
+            metricKey, timeoutInSeconds);
+        waitFor(errorMessage, () -> {
+            @SuppressWarnings("unchecked")
+            T actual = (T) MetricsUtils.currentServerMetrics().get(metricKey);
+            if (!matcher.matches(actual)) {
+                Description description = new StringDescription();
+                matcher.describeMismatch(actual, description);
+                LOG.info("match failed for metric {}: {}", metricKey, description);
+                return false;
+            }
+            return true;
+        }, timeoutInSeconds);
+    }
+
+    /**
+     * Functionally identical to {@link org.hamcrest.Matchers#closeTo} except that it accepts all numerical types
+     * instead of failing if the value is not a {@link Double}.
+     */
+    public static Matcher<Number> closeTo(double operand, double error) {
+        return new CustomMatcher<Number>(String.format("A number within %s of %s", error, operand)) {
+            @Override
+            public boolean matches(Object actual) {
+                return Math.abs(operand - ((Number) actual).doubleValue()) <= error;
+            }
+        };
+    }
+}

+ 5 - 11
zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java

@@ -19,6 +19,8 @@
 package org.apache.zookeeper.server;
 
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.File;
@@ -75,10 +77,6 @@ public class RequestThrottlerTest extends ZKTestCase {
     ZooKeeper zk = null;
     int connectionLossCount = 0;
 
-    private long getCounterMetric(String name) {
-        return (long) MetricsUtils.currentServerMetrics().get(name);
-    }
-
     @BeforeEach
     public void setup() throws Exception {
         // start a server and create a client
@@ -222,11 +220,8 @@ public class RequestThrottlerTest extends ZKTestCase {
         submitted.await(5, TimeUnit.SECONDS);
 
         // but only two requests can get into the pipeline because of the throttler
-        WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2;
-        waitFor("request not queued", requestQueued, 5);
-
-        WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1;
-        waitFor("no throttle wait", throttleWait, 5);
+        waitForMetric("prep_processor_request_queued", is(2L));
+        waitForMetric("request_throttle_wait_count", greaterThanOrEqualTo(1L));
 
         // let the requests go through the pipeline and the throttler will be waken up to allow more requests
         // to enter the pipeline
@@ -387,8 +382,7 @@ public class RequestThrottlerTest extends ZKTestCase {
             // be GLOBAL_OUTSTANDING_LIMIT + 2.
             //
             // But due to leak of consistent view of number of outstanding requests, the number could be larger.
-            WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2;
-            waitFor("no enough requests queued", requestQueued, 5);
+            waitForMetric("prep_processor_request_queued", greaterThanOrEqualTo(Long.parseLong(GLOBAL_OUTSTANDING_LIMIT) + 2));
 
             resumeProcess.countDown();
         } catch (Exception e) {

+ 1 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java

@@ -79,8 +79,7 @@ public class FileTxnSnapLogMetricsTest extends ZKTestCase {
         }
 
         // It is possible that above writes will trigger more than one snapshot due to randomization.
-        WaitForCondition newSnapshot = () -> (long) MetricsUtils.currentServerMetrics().get("cnt_snapshottime") >= 2L;
-        waitFor("no snapshot in 10s", newSnapshot, 10);
+        waitForMetric("cnt_snapshottime", greaterThanOrEqualTo(2L), 10);
 
         // Pauses snapshot and logs more txns.
         cnxnFactory.getZooKeeperServer().getTxnLogFactory().snapLog.close();

+ 0 - 15
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java

@@ -26,10 +26,8 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.test.ClientBase;
-import org.hamcrest.Matcher;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -38,7 +36,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 public class LearnerMetricsTest extends QuorumPeerTestBase {
 
-    private static final int TIMEOUT_SECONDS = 30;
     private static final int SERVER_COUNT = 4; // 1 observer, 3 participants
     private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
     private ZooKeeper zk_client;
@@ -113,18 +110,6 @@ public class LearnerMetricsTest extends QuorumPeerTestBase {
         waitForMetric("min_commit_propagation_latency", greaterThanOrEqualTo(0L));
     }
 
-    private void waitForMetric(final String metricKey, final Matcher<Long> matcher) throws InterruptedException {
-        final String errorMessage = String.format("unable to match on metric: %s", metricKey);
-        waitFor(errorMessage, () -> {
-            long actual = (long) MetricsUtils.currentServerMetrics().get(metricKey);
-            if (!matcher.matches(actual)) {
-                LOG.info("match failed on {}, actual value: {}", metricKey, actual);
-                return false;
-            }
-            return true;
-        }, TIMEOUT_SECONDS);
-    }
-
     @AfterEach
     public void tearDown() throws Exception {
         zk_client.close();

+ 28 - 9
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java

@@ -133,7 +133,7 @@ public class WatchManagerTest extends ZKTestCase {
         public void run() {
             while (!stopped) {
                 String path = PATH_PREFIX + r.nextInt(paths);
-                WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted);
+                WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1);
                 if (s != null) {
                     triggeredCount.addAndGet(s.size());
                 }
@@ -729,6 +729,12 @@ public class WatchManagerTest extends ZKTestCase {
         assertEquals(sum, values.get("sum_" + metricName));
     }
 
+    private void checkMostRecentWatchedEvent(DumbWatcher watcher, String path, EventType eventType, long zxid) {
+        assertEquals(path, watcher.getMostRecentPath());
+        assertEquals(eventType, watcher.getMostRecentEventType());
+        assertEquals(zxid, watcher.getMostRecentZxid());
+    }
+
     @ParameterizedTest
     @MethodSource("data")
     public void testWatcherMetrics(String className) throws IOException {
@@ -743,41 +749,54 @@ public class WatchManagerTest extends ZKTestCase {
 
         final String path3 = "/path3";
 
-        //both wather1 and wather2 are watching path1
+        //both watcher1 and watcher2 are watching path1
         manager.addWatch(path1, watcher1);
         manager.addWatch(path1, watcher2);
 
         //path2 is watched by watcher1
         manager.addWatch(path2, watcher1);
 
-        manager.triggerWatch(path3, EventType.NodeCreated);
+        manager.triggerWatch(path3, EventType.NodeCreated, 1);
         //path3 is not being watched so metric is 0
         checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L);
+        // Watchers shouldn't have received any events yet so the zxid should be -1.
+        checkMostRecentWatchedEvent(watcher1, null, null, -1);
+        checkMostRecentWatchedEvent(watcher2, null, null, -1);
 
         //path1 is watched by two watchers so two fired
-        manager.triggerWatch(path1, EventType.NodeCreated);
+        manager.triggerWatch(path1, EventType.NodeCreated, 2);
         checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L);
+        checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeCreated, 2);
+        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);
 
         //path2 is watched by one watcher so one fired now total is 3
-        manager.triggerWatch(path2, EventType.NodeCreated);
+        manager.triggerWatch(path2, EventType.NodeCreated, 3);
         checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
+        checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
+        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);
 
         //watches on path1 are no longer there so zero fired
-        manager.triggerWatch(path1, EventType.NodeDataChanged);
+        manager.triggerWatch(path1, EventType.NodeDataChanged, 4);
         checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L);
+        checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3);
+        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2);
 
-        //both wather1 and wather2 are watching path1
+        //both watcher and watcher are watching path1
         manager.addWatch(path1, watcher1);
         manager.addWatch(path1, watcher2);
 
         //path2 is watched by watcher1
         manager.addWatch(path2, watcher1);
 
-        manager.triggerWatch(path1, EventType.NodeDataChanged);
+        manager.triggerWatch(path1, EventType.NodeDataChanged, 5);
         checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L);
+        checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeDataChanged, 5);
+        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);
 
-        manager.triggerWatch(path2, EventType.NodeDeleted);
+        manager.triggerWatch(path2, EventType.NodeDeleted, 6);
         checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L);
+        checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeDeleted, 6);
+        checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5);
 
         //make sure that node created watch count is not impacted by the fire of other event types
         checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);

+ 16 - 16
zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java

@@ -17,8 +17,8 @@
 
 package org.apache.zookeeper.server.watch;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -140,7 +140,7 @@ public class WatcherCleanerTest extends ZKTestCase {
     }
 
     @Test
-    public void testDeadWatcherMetrics() {
+    public void testDeadWatcherMetrics() throws InterruptedException {
         ServerMetrics.getMetrics().resetAll();
         MyDeadWatcherListener listener = new MyDeadWatcherListener();
         WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 1, 1, 1);
@@ -156,19 +156,19 @@ public class WatcherCleanerTest extends ZKTestCase {
         assertTrue(listener.wait(5000));
 
         Map<String, Object> values = MetricsUtils.currentServerMetrics();
-        assertThat("Adding dead watcher should be stalled twice", (Long) values.get("add_dead_watcher_stall_time"), greaterThan(0L));
-        assertEquals(3L, values.get("dead_watchers_queued"), "Total dead watchers added to the queue should be 3");
-        assertEquals(3L, values.get("dead_watchers_cleared"), "Total dead watchers cleared should be 3");
-
-        assertEquals(3L, values.get("cnt_dead_watchers_cleaner_latency"));
-
-        //Each latency should be a little over 20 ms, allow 20 ms deviation
-        assertEquals(20D, (Double) values.get("avg_dead_watchers_cleaner_latency"), 20);
-        assertEquals(20D, ((Long) values.get("min_dead_watchers_cleaner_latency")).doubleValue(), 20);
-        assertEquals(20D, ((Long) values.get("max_dead_watchers_cleaner_latency")).doubleValue(), 20);
-        assertEquals(20D, ((Long) values.get("p50_dead_watchers_cleaner_latency")).doubleValue(), 20);
-        assertEquals(20D, ((Long) values.get("p95_dead_watchers_cleaner_latency")).doubleValue(), 20);
-        assertEquals(20D, ((Long) values.get("p99_dead_watchers_cleaner_latency")).doubleValue(), 20);
+        // Adding dead watcher should be stalled twice
+        waitForMetric("add_dead_watcher_stall_time", greaterThan(0L));
+        waitForMetric("dead_watchers_queued", is(3L));
+        waitForMetric("dead_watchers_cleared", is(3L));
+        waitForMetric("cnt_dead_watchers_cleaner_latency", is(3L));
+
+        //Each latency should be a little over 20 ms, allow 5 ms deviation
+        waitForMetric("avg_dead_watchers_cleaner_latency", closeTo(20, 5));
+        waitForMetric("min_dead_watchers_cleaner_latency", closeTo(20, 5));
+        waitForMetric("max_dead_watchers_cleaner_latency", closeTo(20, 5));
+        waitForMetric("p50_dead_watchers_cleaner_latency", closeTo(20, 5));
+        waitForMetric("p95_dead_watchers_cleaner_latency", closeTo(20, 5));
+        waitForMetric("p99_dead_watchers_cleaner_latency", closeTo(20, 5));
     }
 
 }

+ 82 - 52
zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java

@@ -20,7 +20,6 @@ package org.apache.zookeeper.test;
 
 import static org.apache.zookeeper.AddWatchMode.PERSISTENT;
 import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.IOException;
@@ -33,8 +32,11 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -81,21 +83,28 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
 
     private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
         zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.setData("/a/b/c/d/e", new byte[0], -1);
+
+        Stat stat = new Stat();
+        zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        assertEvent(events, EventType.NodeCreated, "/a/b", stat);
+
+        zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        assertEvent(events, EventType.NodeCreated, "/a/b/c", stat);
+
+        zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat);
+
+        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat);
+
+        stat = zk.setData("/a/b/c/d/e", new byte[0], -1);
+        assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat);
+
         zk.delete("/a/b/c/d/e", -1);
-        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
-        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
-        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d");
-        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
-        assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e");
-        assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e");
-        assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+        assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid());
+
+        zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat);
     }
 
     @Test
@@ -104,14 +113,15 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
         try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
             zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
             zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+            Stat stat = new Stat();
+            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+            assertEvent(events, EventType.NodeCreated, "/a/b", stat);
+            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+            assertEvent(events, EventType.NodeCreated, "/a/b/c", stat);
 
             zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
             zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+            assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID);
         }
     }
 
@@ -125,13 +135,15 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
             BlockingQueue<WatchedEvent> childEvents = new LinkedBlockingQueue<>();
             zk.getChildren("/a", childEvents::add);
 
-            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            Stat createABStat = new Stat();
+            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABStat);
+            Stat createABCStat = new Stat();
+            zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABCStat);
 
-            assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a");
+            assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", createABStat.getPzxid());
 
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b", createABStat);
+            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c", createABCStat);
             assertTrue(events.isEmpty());
         }
     }
@@ -141,9 +153,9 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
         try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
             zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
             stopServer();
-            assertEvent(events, Watcher.Event.EventType.None, null);
+            assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID);
             startServer();
-            assertEvent(events, Watcher.Event.EventType.None, null);
+            assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID);
             internalTestBasic(zk);
         }
     }
@@ -158,17 +170,15 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
             zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
             zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE);
-            zk1.setData("/a/b/c", "one".getBytes(), -1);
-            Thread.sleep(1000); // give some time for the event to arrive
-
-            zk2.setData("/a/b/c", "two".getBytes(), -1);
-            zk2.setData("/a/b/c", "three".getBytes(), -1);
-            zk2.setData("/a/b/c", "four".getBytes(), -1);
-
-            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
-            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
-            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
-            assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+            Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1);
+            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
+
+            stat = zk2.setData("/a/b/c", "two".getBytes(), -1);
+            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
+            stat = zk2.setData("/a/b/c", "three".getBytes(), -1);
+            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
+            stat = zk2.setData("/a/b/c", "four".getBytes(), -1);
+            assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid());
         }
     }
 
@@ -224,22 +234,42 @@ public class PersistentRecursiveWatcherTest extends ClientBase {
             throws IOException, InterruptedException, KeeperException {
         try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) {
             zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE);
-            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a");
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b");
-            assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c");
+            Stat stat = new Stat();
+
+            zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+            assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid());
+
+            zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+            assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid());
+
+            zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+            assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid());
+
+            zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+            assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid());
         }
     }
 
-    private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
-            throws InterruptedException {
-        WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
-        assertNotNull(event);
-        assertEquals(eventType, event.getType());
-        assertEquals(path, event.getPath());
+    private void assertEvent(BlockingQueue<WatchedEvent> events, EventType eventType, String path, Stat stat)
+        throws InterruptedException {
+        assertEvent(events, eventType, path, stat.getMzxid());
+    }
+
+    private void assertEvent(BlockingQueue<WatchedEvent> events, EventType eventType, String path, long zxid)
+        throws InterruptedException {
+        assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid);
+    }
+
+    private void assertEvent(BlockingQueue<WatchedEvent> events, EventType eventType, KeeperState keeperState,
+        String path, long zxid) throws InterruptedException {
+        WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS);
+        assertNotNull(actualEvent);
+        WatchedEvent expectedEvent = new WatchedEvent(
+            eventType,
+            keeperState,
+            path,
+            zxid
+        );
+        TestUtils.assertWatchedEventEquals(expectedEvent, actualEvent);
     }
 }

+ 14 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java

@@ -18,8 +18,10 @@
 
 package org.apache.zookeeper.test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
+import org.apache.zookeeper.WatchedEvent;
 
 /**
  * This class contains test utility methods
@@ -57,4 +59,16 @@ public class TestUtils {
         return deleteFileRecursively(file, false);
     }
 
+    /**
+     * Asserts that the given {@link WatchedEvent} are semantically equal, i.e. they have the same EventType, path and
+     * zxid.
+     */
+    public static void assertWatchedEventEquals(WatchedEvent expected, WatchedEvent actual) {
+        // TODO: .hashCode and .equals cannot be added to WatchedEvent without potentially breaking consumers. This
+        //  can be changed to `assertEquals(expected, actual)` once WatchedEvent has those methods. Until then,
+        //  compare the lists manually.
+        assertEquals(expected.getType(), actual.getType());
+        assertEquals(expected.getPath(), actual.getPath());
+        assertEquals(expected.getZxid(), actual.getZxid());
+    }
 }

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java

@@ -59,12 +59,12 @@ public class UnsupportedAddWatcherTest extends ClientBase {
         }
 
         @Override
-        public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) {
+        public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid) {
             return new WatcherOrBitSet(Collections.emptySet());
         }
 
         @Override
-        public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) {
+        public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid, WatcherOrBitSet suppress) {
             return new WatcherOrBitSet(Collections.emptySet());
         }
 

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java

@@ -61,7 +61,7 @@ public class WatchedEventTest extends ZKTestCase {
         for (EventType et : allTypes) {
             for (KeeperState ks : allStates) {
                 wep = new WatcherEvent(et.getIntValue(), ks.getIntValue(), "blah");
-                we = new WatchedEvent(wep);
+                we = new WatchedEvent(wep, WatchedEvent.NO_ZXID);
                 assertEquals(et, we.getType());
                 assertEquals(ks, we.getState());
                 assertEquals("blah", we.getPath());
@@ -75,7 +75,7 @@ public class WatchedEventTest extends ZKTestCase {
 
         try {
             WatcherEvent wep = new WatcherEvent(-2342, -252352, "foo");
-            new WatchedEvent(wep);
+            new WatchedEvent(wep, WatchedEvent.NO_ZXID);
             fail("Was able to create WatchedEvent from bad wrapper");
         } catch (RuntimeException re) {
             // we're good

+ 90 - 65
zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java

@@ -37,6 +37,7 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -68,14 +69,17 @@ public class WatcherFuncTest extends ClientBase {
                 assertTrue(false, "interruption unexpected");
             }
         }
-        public void verify(List<EventType> expected) throws InterruptedException {
+
+        public void verify(List<WatchedEvent> expected) throws InterruptedException {
+            List<WatchedEvent> actual = new ArrayList<>();
             WatchedEvent event;
-            int count = 0;
-            while (count < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) {
-                assertEquals(expected.get(count), event.getType());
-                count++;
+            while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) {
+                actual.add(event);
+            }
+            assertEquals(expected.size(), actual.size());
+            for (int i = 0; i < expected.size(); i++) {
+                TestUtils.assertWatchedEventEquals(expected.get(i), actual.get(i));
             }
-            assertEquals(expected.size(), count);
             events.clear();
         }
 
@@ -88,7 +92,7 @@ public class WatcherFuncTest extends ClientBase {
     private volatile CountDownLatch lsnr_latch;
     private ZooKeeper lsnr;
 
-    private List<EventType> expected;
+    private List<WatchedEvent> expected;
 
     @BeforeEach
     @Override
@@ -127,15 +131,34 @@ public class WatcherFuncTest extends ClientBase {
         expected.clear();
     }
 
+    private void addEvent(List<WatchedEvent> events, EventType eventType, String path, Stat stat) {
+        addEvent(events, eventType, path, stat.getMzxid());
+    }
+
+    private void addEvent(List<WatchedEvent> events, EventType eventType, String path, long zxid) {
+        events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid));
+    }
+
+    private long delete(String path) throws InterruptedException, KeeperException {
+        client.delete(path, -1);
+        int lastSlash = path.lastIndexOf('/');
+        String parent = (lastSlash == 0)
+            ? "/"
+            : path.substring(0, lastSlash);
+        // the deletion's zxid will be reflected in the parent's Pzxid
+        return client.exists(parent, false).getPzxid();
+    }
+
     @Test
     public void testExistsSync() throws IOException, InterruptedException, KeeperException {
         assertNull(lsnr.exists("/foo", true));
         assertNull(lsnr.exists("/foo/bar", true));
 
-        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        expected.add(EventType.NodeCreated);
-        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        expected.add(EventType.NodeCreated);
+        Stat stat = new Stat();
+        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        addEvent(expected, EventType.NodeCreated, "/foo", stat);
+        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        addEvent(expected, EventType.NodeCreated, "/foo/bar", stat);
 
         verify();
 
@@ -160,20 +183,20 @@ public class WatcherFuncTest extends ClientBase {
             assertEquals("/foo/car", e.getPath());
         }
 
-        client.setData("/foo", "parent".getBytes(), -1);
-        expected.add(EventType.NodeDataChanged);
-        client.setData("/foo/bar", "child".getBytes(), -1);
-        expected.add(EventType.NodeDataChanged);
+        stat = client.setData("/foo", "parent".getBytes(), -1);
+        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+        stat = client.setData("/foo/bar", "child".getBytes(), -1);
+        addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat);
 
         verify();
 
         assertNotNull(lsnr.exists("/foo", true));
         assertNotNull(lsnr.exists("/foo/bar", true));
 
-        client.delete("/foo/bar", -1);
-        expected.add(EventType.NodeDeleted);
-        client.delete("/foo", -1);
-        expected.add(EventType.NodeDeleted);
+        long deleteZxid = delete("/foo/bar");
+        addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+        deleteZxid = delete("/foo");
+        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
 
         verify();
     }
@@ -200,20 +223,20 @@ public class WatcherFuncTest extends ClientBase {
         client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         assertNotNull(lsnr.getData("/foo/bar", true, null));
 
-        client.setData("/foo", "parent".getBytes(), -1);
-        expected.add(EventType.NodeDataChanged);
-        client.setData("/foo/bar", "child".getBytes(), -1);
-        expected.add(EventType.NodeDataChanged);
+        Stat stat = client.setData("/foo", "parent".getBytes(), -1);
+        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+        stat = client.setData("/foo/bar", "child".getBytes(), -1);
+        addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat);
 
         verify();
 
         assertNotNull(lsnr.getData("/foo", true, null));
         assertNotNull(lsnr.getData("/foo/bar", true, null));
 
-        client.delete("/foo/bar", -1);
-        expected.add(EventType.NodeDeleted);
-        client.delete("/foo", -1);
-        expected.add(EventType.NodeDeleted);
+        long deleteZxid = delete("/foo/bar");
+        addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+        deleteZxid = delete("/foo");
+        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
 
         verify();
     }
@@ -238,8 +261,9 @@ public class WatcherFuncTest extends ClientBase {
         client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         assertNotNull(lsnr.getChildren("/foo", true));
 
-        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        expected.add(EventType.NodeChildrenChanged); // /foo
+        Stat stat = new Stat();
+        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo
         assertNotNull(lsnr.getChildren("/foo/bar", true));
 
         client.setData("/foo", "parent".getBytes(), -1);
@@ -250,11 +274,11 @@ public class WatcherFuncTest extends ClientBase {
         assertNotNull(lsnr.getChildren("/foo", true));
         assertNotNull(lsnr.getChildren("/foo/bar", true));
 
-        client.delete("/foo/bar", -1);
-        expected.add(EventType.NodeDeleted); // /foo/bar childwatch
-        expected.add(EventType.NodeChildrenChanged); // /foo
-        client.delete("/foo", -1);
-        expected.add(EventType.NodeDeleted);
+        long deleteZxid = delete("/foo/bar");
+        addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch
+        addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo
+        deleteZxid = delete("/foo");
+        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
 
         verify();
     }
@@ -266,7 +290,7 @@ public class WatcherFuncTest extends ClientBase {
         SimpleWatcher w3 = new SimpleWatcher(null);
         SimpleWatcher w4 = new SimpleWatcher(null);
 
-        List<EventType> e2 = new ArrayList<>();
+        List<WatchedEvent> e2 = new ArrayList<>();
 
         assertNull(lsnr.exists("/foo", true));
         assertNull(lsnr.exists("/foo", w1));
@@ -276,10 +300,11 @@ public class WatcherFuncTest extends ClientBase {
         assertNull(lsnr.exists("/foo/bar", w3));
         assertNull(lsnr.exists("/foo/bar", w4));
 
-        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        expected.add(EventType.NodeCreated);
-        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        e2.add(EventType.NodeCreated);
+        Stat stat = new Stat();
+        client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        addEvent(expected, EventType.NodeCreated, "/foo", stat);
+        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        addEvent(e2, EventType.NodeCreated, "/foo/bar", stat);
 
         lsnr_dwatch.verify(expected);
         w1.verify(expected);
@@ -297,10 +322,10 @@ public class WatcherFuncTest extends ClientBase {
         assertNotNull(lsnr.exists("/foo/bar", w4));
         assertNotNull(lsnr.exists("/foo/bar", w4));
 
-        client.setData("/foo", "parent".getBytes(), -1);
-        expected.add(EventType.NodeDataChanged);
-        client.setData("/foo/bar", "child".getBytes(), -1);
-        e2.add(EventType.NodeDataChanged);
+        stat = client.setData("/foo", "parent".getBytes(), -1);
+        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+        stat = client.setData("/foo/bar", "child".getBytes(), -1);
+        addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat);
 
         lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0
         w1.verify(expected);
@@ -319,10 +344,10 @@ public class WatcherFuncTest extends ClientBase {
         assertNotNull(lsnr.exists("/foo/bar", w3));
         assertNotNull(lsnr.exists("/foo/bar", w4));
 
-        client.delete("/foo/bar", -1);
-        expected.add(EventType.NodeDeleted);
-        client.delete("/foo", -1);
-        e2.add(EventType.NodeDeleted);
+        long deleteZxid = delete("/foo/bar");
+        addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+        deleteZxid = delete("/foo");
+        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
 
         lsnr_dwatch.verify(expected);
         w1.verify(expected);
@@ -331,7 +356,6 @@ public class WatcherFuncTest extends ClientBase {
         w4.verify(e2);
         expected.clear();
         e2.clear();
-
     }
 
     @Test
@@ -341,7 +365,7 @@ public class WatcherFuncTest extends ClientBase {
         SimpleWatcher w3 = new SimpleWatcher(null);
         SimpleWatcher w4 = new SimpleWatcher(null);
 
-        List<EventType> e2 = new ArrayList<>();
+        List<WatchedEvent> e2 = new ArrayList<>();
 
         try {
             lsnr.getData("/foo", w1, null);
@@ -367,10 +391,10 @@ public class WatcherFuncTest extends ClientBase {
         assertNotNull(lsnr.getData("/foo/bar", w4, null));
         assertNotNull(lsnr.getData("/foo/bar", w4, null));
 
-        client.setData("/foo", "parent".getBytes(), -1);
-        expected.add(EventType.NodeDataChanged);
-        client.setData("/foo/bar", "child".getBytes(), -1);
-        e2.add(EventType.NodeDataChanged);
+        Stat stat = client.setData("/foo", "parent".getBytes(), -1);
+        addEvent(expected, EventType.NodeDataChanged, "/foo", stat);
+        stat = client.setData("/foo/bar", "child".getBytes(), -1);
+        addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat);
 
         lsnr_dwatch.verify(expected);
         w1.verify(expected);
@@ -387,10 +411,10 @@ public class WatcherFuncTest extends ClientBase {
         assertNotNull(lsnr.getData("/foo/bar", w3, null));
         assertNotNull(lsnr.getData("/foo/bar", w4, null));
 
-        client.delete("/foo/bar", -1);
-        expected.add(EventType.NodeDeleted);
-        client.delete("/foo", -1);
-        e2.add(EventType.NodeDeleted);
+        long deleteZxid = delete("/foo/bar");
+        addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+        deleteZxid = delete("/foo");
+        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
 
         lsnr_dwatch.verify(expected);
         w1.verify(expected);
@@ -408,7 +432,7 @@ public class WatcherFuncTest extends ClientBase {
         SimpleWatcher w3 = new SimpleWatcher(null);
         SimpleWatcher w4 = new SimpleWatcher(null);
 
-        List<EventType> e2 = new ArrayList<>();
+        List<WatchedEvent> e2 = new ArrayList<>();
 
         try {
             lsnr.getChildren("/foo", true);
@@ -429,8 +453,9 @@ public class WatcherFuncTest extends ClientBase {
         assertNotNull(lsnr.getChildren("/foo", true));
         assertNotNull(lsnr.getChildren("/foo", w1));
 
-        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        expected.add(EventType.NodeChildrenChanged); // /foo
+        Stat stat = new Stat();
+        client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo
         assertNotNull(lsnr.getChildren("/foo/bar", w2));
         assertNotNull(lsnr.getChildren("/foo/bar", w2));
         assertNotNull(lsnr.getChildren("/foo/bar", w3));
@@ -451,11 +476,11 @@ public class WatcherFuncTest extends ClientBase {
         assertNotNull(lsnr.getChildren("/foo/bar", w4));
         assertNotNull(lsnr.getChildren("/foo/bar", w4));
 
-        client.delete("/foo/bar", -1);
-        e2.add(EventType.NodeDeleted); // /foo/bar childwatch
-        expected.add(EventType.NodeChildrenChanged); // /foo
-        client.delete("/foo", -1);
-        expected.add(EventType.NodeDeleted);
+        long deleteZxid = delete("/foo/bar");
+        addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid);
+        addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo
+        deleteZxid = delete("/foo");
+        addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid);
 
         lsnr_dwatch.verify(expected);
         w1.verify(expected);