|
@@ -43,9 +43,6 @@ import org.junit.Test;
|
|
|
public class ClientTest extends ClientBase {
|
|
|
protected static final Logger LOG = Logger.getLogger(ClientTest.class);
|
|
|
|
|
|
- LinkedBlockingQueue<WatcherEvent> events =
|
|
|
- new LinkedBlockingQueue<WatcherEvent>();
|
|
|
-
|
|
|
@Override
|
|
|
protected void tearDown() throws Exception {
|
|
|
super.tearDown();
|
|
@@ -145,7 +142,10 @@ public class ClientTest extends ClientBase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected class MyWatcher extends CountdownWatcher {
|
|
|
+ private class MyWatcher extends CountdownWatcher {
|
|
|
+ LinkedBlockingQueue<WatcherEvent> events =
|
|
|
+ new LinkedBlockingQueue<WatcherEvent>();
|
|
|
+
|
|
|
public void process(WatcherEvent event) {
|
|
|
super.process(event);
|
|
|
if (event.getType() != Event.EventNone) {
|
|
@@ -157,13 +157,132 @@ public class ClientTest extends ClientBase {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Register multiple watchers and verify that they all get notified and
|
|
|
+ * in the right order.
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testMutipleWatcherObjs()
|
|
|
+ throws IOException, InterruptedException, KeeperException
|
|
|
+ {
|
|
|
+ ZooKeeper zk = createClient(new CountdownWatcher(), hostPort);
|
|
|
+ try {
|
|
|
+ MyWatcher watchers[] = new MyWatcher[100];
|
|
|
+ MyWatcher watchers2[] = new MyWatcher[watchers.length];
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ watchers[i] = new MyWatcher();
|
|
|
+ watchers2[i] = new MyWatcher();
|
|
|
+ zk.create("/foo-" + i, ("foodata" + i).getBytes(),
|
|
|
+ Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
+ }
|
|
|
+ Stat stat = new Stat();
|
|
|
+
|
|
|
+ //
|
|
|
+ // test get/exists with single set of watchers
|
|
|
+ // get all, then exists all
|
|
|
+ //
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ assertNotNull(zk.getData("/foo-" + i, watchers[i], stat));
|
|
|
+ }
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ assertNotNull(zk.exists("/foo-" + i, watchers[i]));
|
|
|
+ }
|
|
|
+ // trigger the watches
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ zk.setData("/foo-" + i, ("foodata2-" + i).getBytes(), -1);
|
|
|
+ zk.setData("/foo-" + i, ("foodata3-" + i).getBytes(), -1);
|
|
|
+ }
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ WatcherEvent event =
|
|
|
+ watchers[i].events.poll(10, TimeUnit.SECONDS);
|
|
|
+ assertEquals("/foo-" + i, event.getPath());
|
|
|
+ assertEquals(Event.EventNodeDataChanged, event.getType());
|
|
|
+ assertEquals(Event.KeeperStateSyncConnected, event.getState());
|
|
|
+
|
|
|
+ // small chance that an unexpected message was delivered
|
|
|
+ // after this check, but we would catch that next time
|
|
|
+ // we check events
|
|
|
+ assertEquals(0, watchers[i].events.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // test get/exists with single set of watchers
|
|
|
+ // get/exists together
|
|
|
+ //
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ assertNotNull(zk.getData("/foo-" + i, watchers[i], stat));
|
|
|
+ assertNotNull(zk.exists("/foo-" + i, watchers[i]));
|
|
|
+ }
|
|
|
+ // trigger the watches
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ zk.setData("/foo-" + i, ("foodata4-" + i).getBytes(), -1);
|
|
|
+ zk.setData("/foo-" + i, ("foodata5-" + i).getBytes(), -1);
|
|
|
+ }
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ WatcherEvent event =
|
|
|
+ watchers[i].events.poll(10, TimeUnit.SECONDS);
|
|
|
+ assertEquals("/foo-" + i, event.getPath());
|
|
|
+ assertEquals(Event.EventNodeDataChanged, event.getType());
|
|
|
+ assertEquals(Event.KeeperStateSyncConnected, event.getState());
|
|
|
+
|
|
|
+ // small chance that an unexpected message was delivered
|
|
|
+ // after this check, but we would catch that next time
|
|
|
+ // we check events
|
|
|
+ assertEquals(0, watchers[i].events.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ //
|
|
|
+ // test get/exists with two sets of watchers
|
|
|
+ //
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ assertNotNull(zk.getData("/foo-" + i, watchers[i], stat));
|
|
|
+ assertNotNull(zk.exists("/foo-" + i, watchers2[i]));
|
|
|
+ }
|
|
|
+ // trigger the watches
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ zk.setData("/foo-" + i, ("foodata6-" + i).getBytes(), -1);
|
|
|
+ zk.setData("/foo-" + i, ("foodata7-" + i).getBytes(), -1);
|
|
|
+ }
|
|
|
+ for (int i = 0; i < watchers.length; i++) {
|
|
|
+ WatcherEvent event =
|
|
|
+ watchers[i].events.poll(10, TimeUnit.SECONDS);
|
|
|
+ assertEquals("/foo-" + i, event.getPath());
|
|
|
+ assertEquals(Event.EventNodeDataChanged, event.getType());
|
|
|
+ assertEquals(Event.KeeperStateSyncConnected, event.getState());
|
|
|
+
|
|
|
+ // small chance that an unexpected message was delivered
|
|
|
+ // after this check, but we would catch that next time
|
|
|
+ // we check events
|
|
|
+ assertEquals(0, watchers[i].events.size());
|
|
|
+
|
|
|
+ // watchers2
|
|
|
+ WatcherEvent event2 =
|
|
|
+ watchers2[i].events.poll(10, TimeUnit.SECONDS);
|
|
|
+ assertEquals("/foo-" + i, event2.getPath());
|
|
|
+ assertEquals(Event.EventNodeDataChanged, event2.getType());
|
|
|
+ assertEquals(Event.KeeperStateSyncConnected, event2.getState());
|
|
|
+
|
|
|
+ // small chance that an unexpected message was delivered
|
|
|
+ // after this check, but we would catch that next time
|
|
|
+ // we check events
|
|
|
+ assertEquals(0, watchers2[i].events.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (zk != null) {
|
|
|
+ zk.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
private void performClientTest(boolean withWatcherObj)
|
|
|
throws IOException, InterruptedException, KeeperException
|
|
|
{
|
|
|
ZooKeeper zk = null;
|
|
|
try {
|
|
|
- zk = createClient(new MyWatcher(), hostPort);
|
|
|
+ MyWatcher watcher = new MyWatcher();
|
|
|
+ zk = createClient(watcher, hostPort);
|
|
|
//LOG.info("Created client: " + zk.describeCNXN());
|
|
|
LOG.info("Before create /benwashere");
|
|
|
zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
|
|
@@ -182,7 +301,8 @@ public class ClientTest extends ClientBase {
|
|
|
zk.close();
|
|
|
//LOG.info("Closed client: " + zk.describeCNXN());
|
|
|
Thread.sleep(2000);
|
|
|
- zk = createClient(new MyWatcher(), hostPort);
|
|
|
+
|
|
|
+ zk = createClient(watcher, hostPort);
|
|
|
//LOG.info("Created a new client: " + zk.describeCNXN());
|
|
|
LOG.info("Before delete /");
|
|
|
|
|
@@ -203,9 +323,10 @@ public class ClientTest extends ClientBase {
|
|
|
String value = new String(zk.getData("/ben", false, stat));
|
|
|
assertEquals("Ben was here", value);
|
|
|
// Test stat and watch of non existent node
|
|
|
+
|
|
|
try {
|
|
|
if (withWatcherObj) {
|
|
|
- assertEquals(null, zk.exists("/frog", new MyWatcher()));
|
|
|
+ assertEquals(null, zk.exists("/frog", watcher));
|
|
|
} else {
|
|
|
assertEquals(null, zk.exists("/frog", true));
|
|
|
}
|
|
@@ -214,9 +335,10 @@ public class ClientTest extends ClientBase {
|
|
|
// OK, expected that
|
|
|
}
|
|
|
zk.create("/frog", "hi".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
|
|
|
- // the first poll is just a sesssion delivery
|
|
|
- LOG.info("Comment: checking for events length " + events.size());
|
|
|
- WatcherEvent event = events.poll(10, TimeUnit.SECONDS);
|
|
|
+ // the first poll is just a session delivery
|
|
|
+ LOG.info("Comment: checking for events length "
|
|
|
+ + watcher.events.size());
|
|
|
+ WatcherEvent event = watcher.events.poll(10, TimeUnit.SECONDS);
|
|
|
assertEquals("/frog", event.getPath());
|
|
|
assertEquals(Event.EventNodeCreated, event.getType());
|
|
|
assertEquals(Event.KeeperStateSyncConnected, event.getState());
|
|
@@ -234,30 +356,30 @@ public class ClientTest extends ClientBase {
|
|
|
assertTrue("starts with -", name.startsWith(i + "-"));
|
|
|
byte b[];
|
|
|
if (withWatcherObj) {
|
|
|
- b = zk.getData("/ben/" + name, new MyWatcher(), stat);
|
|
|
+ b = zk.getData("/ben/" + name, watcher, stat);
|
|
|
} else {
|
|
|
b = zk.getData("/ben/" + name, true, stat);
|
|
|
}
|
|
|
assertEquals(Integer.toString(i), new String(b));
|
|
|
zk.setData("/ben/" + name, "new".getBytes(), stat.getVersion());
|
|
|
if (withWatcherObj) {
|
|
|
- stat = zk.exists("/ben/" + name, new MyWatcher());
|
|
|
+ stat = zk.exists("/ben/" + name, watcher);
|
|
|
} else {
|
|
|
stat = zk.exists("/ben/" + name, true);
|
|
|
}
|
|
|
zk.delete("/ben/" + name, stat.getVersion());
|
|
|
}
|
|
|
- event = events.poll(10, TimeUnit.SECONDS);
|
|
|
+ event = watcher.events.poll(10, TimeUnit.SECONDS);
|
|
|
assertEquals("/ben", event.getPath());
|
|
|
assertEquals(Event.EventNodeChildrenChanged, event.getType());
|
|
|
assertEquals(Event.KeeperStateSyncConnected, event.getState());
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
- event = events.poll(10, TimeUnit.SECONDS);
|
|
|
+ event = watcher.events.poll(10, TimeUnit.SECONDS);
|
|
|
final String name = children.get(i);
|
|
|
assertEquals("/ben/" + name, event.getPath());
|
|
|
assertEquals(Event.EventNodeDataChanged, event.getType());
|
|
|
assertEquals(Event.KeeperStateSyncConnected, event.getState());
|
|
|
- event = events.poll(10, TimeUnit.SECONDS);
|
|
|
+ event = watcher.events.poll(10, TimeUnit.SECONDS);
|
|
|
assertEquals("/ben/" + name, event.getPath());
|
|
|
assertEquals(Event.EventNodeDeleted, event.getType());
|
|
|
assertEquals(Event.KeeperStateSyncConnected, event.getState());
|