|
@@ -106,37 +106,35 @@ import org.apache.zookeeper.server.DataTree;
|
|
|
public class ZooKeeper {
|
|
|
private static final Logger LOG = Logger.getLogger(ZooKeeper.class);
|
|
|
|
|
|
- private volatile Watcher defaultWatcher;
|
|
|
-
|
|
|
- private final Map<String, Set<Watcher>> dataWatches =
|
|
|
- new HashMap<String, Set<Watcher>>();
|
|
|
- private final Map<String, Set<Watcher>> childWatches =
|
|
|
- new HashMap<String, Set<Watcher>>();
|
|
|
-
|
|
|
+ private final ZKWatchManager watchManager = new ZKWatchManager();
|
|
|
+
|
|
|
/**
|
|
|
- * Process watch events generated by the ClientCnxn object.
|
|
|
+ * Manage watchers & handle events generated by the ClientCnxn object.
|
|
|
*
|
|
|
* We are implementing this as a nested class of ZooKeeper so that
|
|
|
- * the public Watcher.process(event) method will not be exposed as part
|
|
|
- * of the ZooKeeper client API.
|
|
|
+ * the public methods will not be exposed as part of the ZooKeeper client
|
|
|
+ * API.
|
|
|
*/
|
|
|
- private class ZKWatcher implements Watcher {
|
|
|
- /**
|
|
|
- * Process a WatchEvent.
|
|
|
- *
|
|
|
- * Looks up the watch in the set of watches, processes the event
|
|
|
- * if found, otw uses the default watcher (registered during instance
|
|
|
- * creation) to process the watch.
|
|
|
- *
|
|
|
- * @param event the event to process.
|
|
|
+ private class ZKWatchManager implements ClientWatchManager {
|
|
|
+ private final Map<String, Set<Watcher>> dataWatches =
|
|
|
+ new HashMap<String, Set<Watcher>>();
|
|
|
+ private final Map<String, Set<Watcher>> childWatches =
|
|
|
+ new HashMap<String, Set<Watcher>>();
|
|
|
+
|
|
|
+ private volatile Watcher defaultWatcher;
|
|
|
+
|
|
|
+ /* (non-Javadoc)
|
|
|
+ * @see org.apache.zookeeper.ClientWatchManager#materialize(int, int, java.lang.String)
|
|
|
*/
|
|
|
- public void process(WatcherEvent event) {
|
|
|
+ public Set<Watcher> materialize(int state, int type, String path) {
|
|
|
+ Set<Watcher> result = new HashSet<Watcher>();
|
|
|
+
|
|
|
// clear the watches if we are not connected
|
|
|
- if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
|
|
|
+ if (state != Watcher.Event.KeeperStateSyncConnected) {
|
|
|
synchronized (dataWatches) {
|
|
|
for (Set<Watcher> watchers : dataWatches.values()) {
|
|
|
for (Watcher watcher : watchers) {
|
|
|
- watcher.process(event);
|
|
|
+ result.add(watcher);
|
|
|
}
|
|
|
}
|
|
|
dataWatches.clear();
|
|
@@ -144,7 +142,7 @@ public class ZooKeeper {
|
|
|
synchronized (childWatches) {
|
|
|
for (Set<Watcher> watchers : childWatches.values()) {
|
|
|
for (Watcher watcher : watchers) {
|
|
|
- watcher.process(event);
|
|
|
+ result.add(watcher);
|
|
|
}
|
|
|
}
|
|
|
childWatches.clear();
|
|
@@ -153,28 +151,28 @@ public class ZooKeeper {
|
|
|
|
|
|
Set<Watcher> watchers = null;
|
|
|
|
|
|
- switch (event.getType()) {
|
|
|
+ switch (type) {
|
|
|
case Watcher.Event.EventNone:
|
|
|
- defaultWatcher.process(event);
|
|
|
- return;
|
|
|
+ result.add(defaultWatcher);
|
|
|
+ return result;
|
|
|
case Watcher.Event.EventNodeDataChanged:
|
|
|
case Watcher.Event.EventNodeCreated:
|
|
|
synchronized (dataWatches) {
|
|
|
- watchers = dataWatches.remove(event.getPath());
|
|
|
+ watchers = dataWatches.remove(path);
|
|
|
}
|
|
|
break;
|
|
|
case Watcher.Event.EventNodeChildrenChanged:
|
|
|
synchronized (childWatches) {
|
|
|
- watchers = childWatches.remove(event.getPath());
|
|
|
+ watchers = childWatches.remove(path);
|
|
|
}
|
|
|
break;
|
|
|
case Watcher.Event.EventNodeDeleted:
|
|
|
synchronized (dataWatches) {
|
|
|
- watchers = dataWatches.remove(event.getPath());
|
|
|
+ watchers = dataWatches.remove(path);
|
|
|
}
|
|
|
Set<Watcher> cwatches;
|
|
|
synchronized (childWatches) {
|
|
|
- cwatches = childWatches.remove(event.getPath());
|
|
|
+ cwatches = childWatches.remove(path);
|
|
|
}
|
|
|
if (cwatches != null) {
|
|
|
if (watchers == null) {
|
|
@@ -185,16 +183,14 @@ public class ZooKeeper {
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
|
- String msg = "Unhandled watch event type " + event.getType();
|
|
|
+ String msg = "Unhandled watch event type " + type
|
|
|
+ + " with state " + state + " on path " + path;
|
|
|
LOG.error(msg);
|
|
|
throw new RuntimeException(msg);
|
|
|
}
|
|
|
|
|
|
- if (watchers != null) {
|
|
|
- for (Watcher watcher : watchers) {
|
|
|
- watcher.process(event);
|
|
|
- }
|
|
|
- }
|
|
|
+ result.addAll(watchers);
|
|
|
+ return result;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -270,14 +266,14 @@ public class ZooKeeper {
|
|
|
|
|
|
public ZooKeeper(String host, int sessionTimeout, Watcher watcher)
|
|
|
throws IOException {
|
|
|
- this.defaultWatcher = watcher;
|
|
|
- cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher());
|
|
|
+ watchManager.defaultWatcher = watcher;
|
|
|
+ cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager);
|
|
|
}
|
|
|
|
|
|
public ZooKeeper(String host, int sessionTimeout, Watcher watcher,
|
|
|
long sessionId, byte[] sessionPasswd) throws IOException {
|
|
|
- this.defaultWatcher = watcher;
|
|
|
- cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher(),
|
|
|
+ watchManager.defaultWatcher = watcher;
|
|
|
+ cnxn = new ClientCnxn(host, sessionTimeout, this, watchManager,
|
|
|
sessionId, sessionPasswd);
|
|
|
}
|
|
|
|
|
@@ -301,7 +297,7 @@ public class ZooKeeper {
|
|
|
}
|
|
|
|
|
|
public synchronized void register(Watcher watcher) {
|
|
|
- this.defaultWatcher = watcher;
|
|
|
+ watchManager.defaultWatcher = watcher;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -503,7 +499,8 @@ public class ZooKeeper {
|
|
|
SetDataResponse response = new SetDataResponse();
|
|
|
WatchRegistration wcb = null;
|
|
|
if (watcher != null) {
|
|
|
- wcb = new ExistsWatchRegistration(dataWatches, watcher, path);
|
|
|
+ wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
|
|
|
+ path);
|
|
|
}
|
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
|
if (r.getErr() != 0) {
|
|
@@ -537,7 +534,7 @@ public class ZooKeeper {
|
|
|
public Stat exists(String path, boolean watch) throws KeeperException,
|
|
|
InterruptedException
|
|
|
{
|
|
|
- return exists(path, watch ? defaultWatcher : null);
|
|
|
+ return exists(path, watch ? watchManager.defaultWatcher : null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -557,7 +554,8 @@ public class ZooKeeper {
|
|
|
SetDataResponse response = new SetDataResponse();
|
|
|
WatchRegistration wcb = null;
|
|
|
if (watcher != null) {
|
|
|
- wcb = new ExistsWatchRegistration(dataWatches, watcher, path);
|
|
|
+ wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
|
|
|
+ path);
|
|
|
}
|
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
|
ctx, wcb);
|
|
@@ -570,7 +568,7 @@ public class ZooKeeper {
|
|
|
* @see #exists(String, boolean)
|
|
|
*/
|
|
|
public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
|
|
|
- exists(path, watch ? defaultWatcher : null, cb, ctx);
|
|
|
+ exists(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -601,7 +599,8 @@ public class ZooKeeper {
|
|
|
GetDataResponse response = new GetDataResponse();
|
|
|
WatchRegistration wcb = null;
|
|
|
if (watcher != null) {
|
|
|
- wcb = new WatchRegistration(dataWatches, watcher, path);
|
|
|
+ wcb = new WatchRegistration(watchManager.dataWatches, watcher,
|
|
|
+ path);
|
|
|
}
|
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
|
if (r.getErr() != 0) {
|
|
@@ -633,7 +632,7 @@ public class ZooKeeper {
|
|
|
*/
|
|
|
public byte[] getData(String path, boolean watch, Stat stat)
|
|
|
throws KeeperException, InterruptedException {
|
|
|
- return getData(path, watch ? defaultWatcher : null, stat);
|
|
|
+ return getData(path, watch ? watchManager.defaultWatcher : null, stat);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -651,7 +650,8 @@ public class ZooKeeper {
|
|
|
GetDataResponse response = new GetDataResponse();
|
|
|
WatchRegistration wcb = null;
|
|
|
if (watcher != null) {
|
|
|
- wcb = new WatchRegistration(dataWatches, watcher, path);
|
|
|
+ wcb = new WatchRegistration(watchManager.dataWatches, watcher,
|
|
|
+ path);
|
|
|
}
|
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
|
ctx, wcb);
|
|
@@ -664,7 +664,7 @@ public class ZooKeeper {
|
|
|
* @see #getData(String, boolean, Stat)
|
|
|
*/
|
|
|
public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
|
|
|
- getData(path, watch ? defaultWatcher : null, cb, ctx);
|
|
|
+ getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -862,7 +862,8 @@ public class ZooKeeper {
|
|
|
GetChildrenResponse response = new GetChildrenResponse();
|
|
|
WatchRegistration wcb = null;
|
|
|
if (watcher != null) {
|
|
|
- wcb = new WatchRegistration(childWatches, watcher, path);
|
|
|
+ wcb = new WatchRegistration(watchManager.childWatches, watcher,
|
|
|
+ path);
|
|
|
}
|
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
|
if (r.getErr() != 0) {
|
|
@@ -893,7 +894,7 @@ public class ZooKeeper {
|
|
|
*/
|
|
|
public List<String> getChildren(String path, boolean watch)
|
|
|
throws KeeperException, InterruptedException {
|
|
|
- return getChildren(path, watch ? defaultWatcher : null);
|
|
|
+ return getChildren(path, watch ? watchManager.defaultWatcher : null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -912,7 +913,8 @@ public class ZooKeeper {
|
|
|
GetChildrenResponse response = new GetChildrenResponse();
|
|
|
WatchRegistration wcb = null;
|
|
|
if (watcher != null) {
|
|
|
- wcb = new WatchRegistration(childWatches, watcher, path);
|
|
|
+ wcb = new WatchRegistration(watchManager.childWatches, watcher,
|
|
|
+ path);
|
|
|
}
|
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
|
ctx, wcb);
|
|
@@ -926,7 +928,7 @@ public class ZooKeeper {
|
|
|
*/
|
|
|
public void getChildren(String path, boolean watch, ChildrenCallback cb,
|
|
|
Object ctx) {
|
|
|
- getChildren(path, watch ? defaultWatcher : null, cb, ctx);
|
|
|
+ getChildren(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
|
|
|
}
|
|
|
|
|
|
/**
|