|
@@ -114,77 +114,86 @@ public class ZooKeeper {
|
|
|
new HashMap<String, Set<Watcher>>();
|
|
|
|
|
|
/**
|
|
|
- * Process a WatchEvent.
|
|
|
- *
|
|
|
- * Looks up the watch in the set of watches, processes the event
|
|
|
- * if found, otw uses the default watcher (registered during instance
|
|
|
- * creation) to process the watch.
|
|
|
- *
|
|
|
- * @param event the event to process.
|
|
|
+ * Process watch events generated by the ClientCnxn object.
|
|
|
+ *
|
|
|
+ * We are implementing this as a nested class of ZooKeeper so that
|
|
|
+ * the public Watcher.process(event) method will not be exposed as part
|
|
|
+ * of the ZooKeeper client API.
|
|
|
*/
|
|
|
- public void processWatchEvent(WatcherEvent event) {
|
|
|
- // clear the watches if we are not connected
|
|
|
- if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
|
|
|
- synchronized (dataWatches) {
|
|
|
- for (Set<Watcher> watchers : dataWatches.values()) {
|
|
|
- for (Watcher watcher : watchers) {
|
|
|
- watcher.process(event);
|
|
|
+ private class ZKWatcher implements Watcher {
|
|
|
+ /**
|
|
|
+ * Process a WatchEvent.
|
|
|
+ *
|
|
|
+ * Looks up the watch in the set of watches, processes the event
|
|
|
+ * if found, otw uses the default watcher (registered during instance
|
|
|
+ * creation) to process the watch.
|
|
|
+ *
|
|
|
+ * @param event the event to process.
|
|
|
+ */
|
|
|
+ public void process(WatcherEvent event) {
|
|
|
+ // clear the watches if we are not connected
|
|
|
+ if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
|
|
|
+ synchronized (dataWatches) {
|
|
|
+ for (Set<Watcher> watchers : dataWatches.values()) {
|
|
|
+ for (Watcher watcher : watchers) {
|
|
|
+ watcher.process(event);
|
|
|
+ }
|
|
|
}
|
|
|
+ dataWatches.clear();
|
|
|
}
|
|
|
- dataWatches.clear();
|
|
|
- }
|
|
|
- synchronized (childWatches) {
|
|
|
- for (Set<Watcher> watchers : childWatches.values()) {
|
|
|
- for (Watcher watcher : watchers) {
|
|
|
- watcher.process(event);
|
|
|
+ synchronized (childWatches) {
|
|
|
+ for (Set<Watcher> watchers : childWatches.values()) {
|
|
|
+ for (Watcher watcher : watchers) {
|
|
|
+ watcher.process(event);
|
|
|
+ }
|
|
|
}
|
|
|
+ childWatches.clear();
|
|
|
}
|
|
|
- childWatches.clear();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Set<Watcher> watchers = null;
|
|
|
-
|
|
|
- switch (event.getType()) {
|
|
|
- case Watcher.Event.EventNone:
|
|
|
- defaultWatcher.process(event);
|
|
|
- return;
|
|
|
- case Watcher.Event.EventNodeDataChanged:
|
|
|
- case Watcher.Event.EventNodeCreated:
|
|
|
- synchronized (dataWatches) {
|
|
|
- watchers = dataWatches.remove(event.getPath());
|
|
|
- }
|
|
|
- break;
|
|
|
- case Watcher.Event.EventNodeChildrenChanged:
|
|
|
- synchronized (childWatches) {
|
|
|
- watchers = childWatches.remove(event.getPath());
|
|
|
}
|
|
|
- break;
|
|
|
- case Watcher.Event.EventNodeDeleted:
|
|
|
- synchronized (dataWatches) {
|
|
|
- watchers = dataWatches.remove(event.getPath());
|
|
|
- }
|
|
|
- Set<Watcher> cwatches;
|
|
|
- synchronized (childWatches) {
|
|
|
- cwatches = childWatches.remove(event.getPath());
|
|
|
- }
|
|
|
- if (cwatches != null) {
|
|
|
- if (watchers == null) {
|
|
|
- watchers = cwatches;
|
|
|
- } else {
|
|
|
- watchers.addAll(cwatches);
|
|
|
+
|
|
|
+ Set<Watcher> watchers = null;
|
|
|
+
|
|
|
+ switch (event.getType()) {
|
|
|
+ case Watcher.Event.EventNone:
|
|
|
+ defaultWatcher.process(event);
|
|
|
+ return;
|
|
|
+ case Watcher.Event.EventNodeDataChanged:
|
|
|
+ case Watcher.Event.EventNodeCreated:
|
|
|
+ synchronized (dataWatches) {
|
|
|
+ watchers = dataWatches.remove(event.getPath());
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case Watcher.Event.EventNodeChildrenChanged:
|
|
|
+ synchronized (childWatches) {
|
|
|
+ watchers = childWatches.remove(event.getPath());
|
|
|
}
|
|
|
+ break;
|
|
|
+ case Watcher.Event.EventNodeDeleted:
|
|
|
+ synchronized (dataWatches) {
|
|
|
+ watchers = dataWatches.remove(event.getPath());
|
|
|
+ }
|
|
|
+ Set<Watcher> cwatches;
|
|
|
+ synchronized (childWatches) {
|
|
|
+ cwatches = childWatches.remove(event.getPath());
|
|
|
+ }
|
|
|
+ if (cwatches != null) {
|
|
|
+ if (watchers == null) {
|
|
|
+ watchers = cwatches;
|
|
|
+ } else {
|
|
|
+ watchers.addAll(cwatches);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ String msg = "Unhandled watch event type " + event.getType();
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new RuntimeException(msg);
|
|
|
}
|
|
|
- break;
|
|
|
- default:
|
|
|
- String msg = "Unhandled watch event type " + event.getType();
|
|
|
- LOG.error(msg);
|
|
|
- throw new RuntimeException(msg);
|
|
|
- }
|
|
|
-
|
|
|
- if (watchers != null) {
|
|
|
- for (Watcher watcher : watchers) {
|
|
|
- watcher.process(event);
|
|
|
+
|
|
|
+ if (watchers != null) {
|
|
|
+ for (Watcher watcher : watchers) {
|
|
|
+ watcher.process(event);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -257,19 +266,19 @@ public class ZooKeeper {
|
|
|
|
|
|
volatile States state;
|
|
|
|
|
|
- ClientCnxn cnxn;
|
|
|
+ protected ClientCnxn cnxn;
|
|
|
|
|
|
public ZooKeeper(String host, int sessionTimeout, Watcher watcher)
|
|
|
throws IOException {
|
|
|
this.defaultWatcher = watcher;
|
|
|
- cnxn = new ClientCnxn(host, sessionTimeout, this);
|
|
|
+ cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher());
|
|
|
}
|
|
|
|
|
|
public ZooKeeper(String host, int sessionTimeout, Watcher watcher,
|
|
|
long sessionId, byte[] sessionPasswd) throws IOException {
|
|
|
this.defaultWatcher = watcher;
|
|
|
- cnxn = new ClientCnxn(host, sessionTimeout, this, sessionId,
|
|
|
- sessionPasswd);
|
|
|
+ cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher(),
|
|
|
+ sessionId, sessionPasswd);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -291,10 +300,6 @@ public class ZooKeeper {
|
|
|
cnxn.addAuthInfo(scheme, auth);
|
|
|
}
|
|
|
|
|
|
- public String describeCNXN() {
|
|
|
- return cnxn.toString();
|
|
|
- }
|
|
|
-
|
|
|
public synchronized void register(Watcher watcher) {
|
|
|
this.defaultWatcher = watcher;
|
|
|
}
|
|
@@ -935,13 +940,4 @@ public class ZooKeeper {
|
|
|
public States getState() {
|
|
|
return state;
|
|
|
}
|
|
|
-
|
|
|
- // Everything below this line is for testing!
|
|
|
-
|
|
|
- /** Testing only!!! Really this needs to be moved into a stub in the
|
|
|
- * tests - pending JIRA for that.
|
|
|
- */
|
|
|
- public void disconnect() throws IOException {
|
|
|
- cnxn.disconnect();
|
|
|
- }
|
|
|
}
|