|
@@ -22,6 +22,7 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -29,11 +30,13 @@ import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.apache.zookeeper.KeeperException.Code;
|
|
|
import org.apache.zookeeper.KeeperException.NoWatcherException;
|
|
|
import org.apache.zookeeper.Watcher.Event.EventType;
|
|
|
import org.apache.zookeeper.Watcher.WatcherType;
|
|
|
import org.apache.zookeeper.ZooDefs.Ids;
|
|
|
+import org.apache.zookeeper.server.ServerCnxn;
|
|
|
import org.apache.zookeeper.test.ClientBase;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Test;
|
|
@@ -139,14 +142,18 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
Assert.assertNotNull("Didn't set data watches",
|
|
|
zk2.exists("/node2", w2));
|
|
|
removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK);
|
|
|
+ Assert.assertEquals("Didn't find data watcher", 1,
|
|
|
+ zk2.getDataWatches().size());
|
|
|
+ Assert.assertEquals("Didn't find data watcher", "/node2",
|
|
|
+ zk2.getDataWatches().get(0));
|
|
|
+ removeWatches(zk2, "/node2", w2, WatcherType.Any, false, Code.OK);
|
|
|
+ Assert.assertTrue("Didn't remove data watcher", w2.matches());
|
|
|
// closing session should remove ephemeral nodes and trigger data
|
|
|
// watches if any
|
|
|
if (zk1 != null) {
|
|
|
zk1.close();
|
|
|
zk1 = null;
|
|
|
}
|
|
|
- Assert.assertTrue("Didn't remove data watcher", w1.matches());
|
|
|
- Assert.assertFalse("Should have removed data watcher", w2.matches());
|
|
|
|
|
|
List<EventType> events = w1.getEventsAfterWatchRemoval();
|
|
|
Assert.assertFalse(
|
|
@@ -176,14 +183,18 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
Assert.assertNotNull("Didn't set data watches",
|
|
|
zk2.exists("/node1", w2));
|
|
|
removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK);
|
|
|
+ Assert.assertEquals("Didn't find data watcher", 1,
|
|
|
+ zk2.getDataWatches().size());
|
|
|
+ Assert.assertEquals("Didn't find data watcher", "/node1",
|
|
|
+ zk2.getDataWatches().get(0));
|
|
|
+ removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
|
|
|
+ Assert.assertTrue("Didn't remove data watcher", w2.matches());
|
|
|
// closing session should remove ephemeral nodes and trigger data
|
|
|
// watches if any
|
|
|
if (zk1 != null) {
|
|
|
zk1.close();
|
|
|
zk1 = null;
|
|
|
}
|
|
|
- Assert.assertTrue("Didn't remove data watcher", w2.matches());
|
|
|
- Assert.assertFalse("Should have removed data watcher", w1.matches());
|
|
|
|
|
|
List<EventType> events = w2.getEventsAfterWatchRemoval();
|
|
|
Assert.assertEquals(
|
|
@@ -209,7 +220,10 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
zk2.getChildren("/node1", w2);
|
|
|
removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK);
|
|
|
Assert.assertTrue("Didn't remove child watcher", w2.matches());
|
|
|
- Assert.assertFalse("Should have removed child watcher", w1.matches());
|
|
|
+ Assert.assertEquals("Didn't find child watcher", 1, zk2
|
|
|
+ .getChildWatches().size());
|
|
|
+ removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
|
|
|
+ Assert.assertTrue("Didn't remove child watcher", w1.matches());
|
|
|
// create child to see NodeChildren notification
|
|
|
zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
|
|
|
CreateMode.PERSISTENT);
|
|
@@ -447,7 +461,12 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
zk2.getChildren("/node1", w2);
|
|
|
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
|
|
|
Assert.assertTrue("Didn't remove data watcher", w1.matches());
|
|
|
- Assert.assertFalse("Shouldn't remove child watcher", w2.matches());
|
|
|
+ Assert.assertEquals("Didn't find child watcher", 1, zk2
|
|
|
+ .getChildWatches().size());
|
|
|
+ Assert.assertEquals("Didn't find data watcher", 1, zk2
|
|
|
+ .getDataWatches().size());
|
|
|
+ removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
|
|
|
+ Assert.assertTrue("Didn't remove child watcher", w2.matches());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -472,7 +491,12 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
zk2.getChildren("/node1", w1);
|
|
|
removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
|
|
|
Assert.assertTrue("Didn't remove child watcher", w2.matches());
|
|
|
- Assert.assertFalse("Shouldn't remove data watcher", w1.matches());
|
|
|
+ Assert.assertEquals("Didn't find child watcher", 1, zk2
|
|
|
+ .getChildWatches().size());
|
|
|
+ Assert.assertEquals("Didn't find data watcher", 1, zk2
|
|
|
+ .getDataWatches().size());
|
|
|
+ removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
|
|
|
+ Assert.assertTrue("Didn't remove watchers", w1.matches());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -708,7 +732,10 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
zk2.getChildren("/node1", w1);
|
|
|
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
|
|
|
Assert.assertTrue("Didn't remove child watcher", w1.matches());
|
|
|
- Assert.assertFalse("Shouldn't remove data watcher", w2.matches());
|
|
|
+ Assert.assertEquals("Didn't find child watcher", 1, zk2
|
|
|
+ .getChildWatches().size());
|
|
|
+ removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
|
|
|
+ Assert.assertTrue("Didn't remove child watcher", w2.matches());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -930,16 +957,16 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
Assert.assertNotNull("Didn't set data watches",
|
|
|
zk2.exists("/node1", w2));
|
|
|
|
|
|
+ Assert.assertTrue("Server session is not a watcher",
|
|
|
+ isServerSessionWatcher(zk2.getSessionId(), "/node1",
|
|
|
+ WatcherType.Data));
|
|
|
removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK);
|
|
|
Assert.assertTrue("Didn't remove data watcher",
|
|
|
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
|
|
|
|
|
|
- zk1.setData("/node1", "test".getBytes(), -1);
|
|
|
- LOG.info("Waiting for data watchers notification after watch removal");
|
|
|
- Assert.assertFalse("Received data watch notification!",
|
|
|
- dWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
|
|
|
- Assert.assertEquals("Received watch notification after removal!", 2,
|
|
|
- dWatchCount.getCount());
|
|
|
+ Assert.assertFalse("Server session is still a watcher after removal",
|
|
|
+ isServerSessionWatcher(zk2.getSessionId(), "/node1",
|
|
|
+ WatcherType.Data));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -991,17 +1018,16 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
Assert.assertEquals("Didn't set child watches", 0,
|
|
|
zk2.getChildren("/node1", w2).size());
|
|
|
|
|
|
+ Assert.assertTrue("Server session is not a watcher",
|
|
|
+ isServerSessionWatcher(zk2.getSessionId(), "/node1",
|
|
|
+ WatcherType.Children));
|
|
|
removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK);
|
|
|
Assert.assertTrue("Didn't remove child watcher",
|
|
|
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
|
|
|
|
|
|
- zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateMode.PERSISTENT);
|
|
|
- LOG.info("Waiting for child watchers to be notified");
|
|
|
- Assert.assertFalse("Didn't get child watch notification!",
|
|
|
- cWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
|
|
|
- Assert.assertEquals("Received watch notification after removal!", 2,
|
|
|
- cWatchCount.getCount());
|
|
|
+ Assert.assertFalse("Server session is still a watcher after removal",
|
|
|
+ isServerSessionWatcher(zk2.getSessionId(), "/node1",
|
|
|
+ WatcherType.Children));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1067,17 +1093,15 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
Assert.assertNotNull("Didn't set data watches",
|
|
|
zk2.exists("/node1", w2));
|
|
|
|
|
|
+ Assert.assertTrue("Server session is not a watcher",
|
|
|
+ isServerSessionWatcher(zk2.getSessionId(), "/node1",
|
|
|
+ WatcherType.Data));
|
|
|
removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK);
|
|
|
Assert.assertTrue("Didn't remove data watcher",
|
|
|
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
|
|
|
-
|
|
|
- zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
|
|
|
- CreateMode.PERSISTENT);
|
|
|
- zk1.setData("/node1", "test".getBytes(), -1);
|
|
|
-
|
|
|
- LOG.info("Waiting for child/data watchers notification after watch removal");
|
|
|
- Assert.assertFalse("Received watch notification after removal!",
|
|
|
- watchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
|
|
|
+ Assert.assertFalse("Server session is still a watcher after removal",
|
|
|
+ isServerSessionWatcher(zk2.getSessionId(), "/node1",
|
|
|
+ WatcherType.Data));
|
|
|
Assert.assertEquals("Received watch notification after removal!", 2,
|
|
|
watchCount.getCount());
|
|
|
}
|
|
@@ -1146,6 +1170,14 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns true if the watcher was triggered. Try to avoid using this
|
|
|
+ * method with assertFalse statements. A false return depends on a timed
|
|
|
+ * out wait on a latch, which makes tests run long.
|
|
|
+ *
|
|
|
+ * @return true if the watcher was triggered, false otherwise
|
|
|
+ * @throws InterruptedException if interrupted while waiting on latch
|
|
|
+ */
|
|
|
public boolean matches() throws InterruptedException {
|
|
|
if (!latch.await(CONNECTION_TIMEOUT/5, TimeUnit.MILLISECONDS)) {
|
|
|
LOG.error("Failed waiting to remove the watches");
|
|
@@ -1181,6 +1213,14 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
this.latch.countDown();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns true if the callback was triggered. Try to avoid using this
|
|
|
+ * method with assertFalse statements. A false return depends on a timed
|
|
|
+ * out wait on a latch, which makes tests run long.
|
|
|
+ *
|
|
|
+ * @return true if the watcher was triggered, false otherwise
|
|
|
+ * @throws InterruptedException if interrupted while waiting on latch
|
|
|
+ */
|
|
|
public boolean matches() throws InterruptedException {
|
|
|
if (!latch.await(CONNECTION_TIMEOUT/5, TimeUnit.MILLISECONDS)) {
|
|
|
return false;
|
|
@@ -1188,4 +1228,25 @@ public class RemoveWatchesTest extends ClientBase {
|
|
|
return path.equals(eventPath) && rc == eventRc;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Checks if a session is registered with the server as a watcher.
|
|
|
+ *
|
|
|
+ * @param long sessionId the session ID to check
|
|
|
+ * @param path the path to check for watchers
|
|
|
+ * @param type the type of watcher
|
|
|
+ * @return true if the client session is a watcher on path for the type
|
|
|
+ */
|
|
|
+ private boolean isServerSessionWatcher(long sessionId, String path,
|
|
|
+ WatcherType type) {
|
|
|
+ Set<ServerCnxn> cnxns = new HashSet<>();
|
|
|
+ CollectionUtils.addAll(cnxns, serverFactory.getConnections().iterator());
|
|
|
+ for (ServerCnxn cnxn : cnxns) {
|
|
|
+ if (cnxn.getSessionId() == sessionId) {
|
|
|
+ return getServer(serverFactory).getZKDatabase().getDataTree()
|
|
|
+ .containsWatcher(path, type, cnxn);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|