|
@@ -19,6 +19,7 @@
|
|
package org.apache.zookeeper;
|
|
package org.apache.zookeeper;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -99,7 +100,20 @@ public class ZooKeeper {
|
|
|
|
|
|
private final ZKWatchManager watchManager = new ZKWatchManager();
|
|
private final ZKWatchManager watchManager = new ZKWatchManager();
|
|
|
|
|
|
- /**
|
|
|
|
|
|
+ List<String> getDataWatches() {
|
|
|
|
+ List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
|
|
|
|
+ return rc;
|
|
|
|
+ }
|
|
|
|
+ List<String> getExistWatches() {
|
|
|
|
+ List<String> rc = new ArrayList<String>(watchManager.existWatches.keySet());
|
|
|
|
+ return rc;
|
|
|
|
+ }
|
|
|
|
+ List<String> getChildWatches() {
|
|
|
|
+ List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
|
|
|
|
+ return rc;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+/**
|
|
* Manage watchers & handle events generated by the ClientCnxn object.
|
|
* Manage watchers & handle events generated by the ClientCnxn object.
|
|
*
|
|
*
|
|
* We are implementing this as a nested class of ZooKeeper so that
|
|
* We are implementing this as a nested class of ZooKeeper so that
|
|
@@ -109,11 +123,19 @@ public class ZooKeeper {
|
|
private class ZKWatchManager implements ClientWatchManager {
|
|
private class ZKWatchManager implements ClientWatchManager {
|
|
private final Map<String, Set<Watcher>> dataWatches =
|
|
private final Map<String, Set<Watcher>> dataWatches =
|
|
new HashMap<String, Set<Watcher>>();
|
|
new HashMap<String, Set<Watcher>>();
|
|
|
|
+ private final Map<String, Set<Watcher>> existWatches =
|
|
|
|
+ new HashMap<String, Set<Watcher>>();
|
|
private final Map<String, Set<Watcher>> childWatches =
|
|
private final Map<String, Set<Watcher>> childWatches =
|
|
new HashMap<String, Set<Watcher>>();
|
|
new HashMap<String, Set<Watcher>>();
|
|
-
|
|
|
|
|
|
+
|
|
private volatile Watcher defaultWatcher;
|
|
private volatile Watcher defaultWatcher;
|
|
|
|
|
|
|
|
+ final private void addTo(Set<Watcher> from, Set<Watcher> to) {
|
|
|
|
+ if (from != null) {
|
|
|
|
+ to.addAll(from);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
/* (non-Javadoc)
|
|
/* (non-Javadoc)
|
|
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
|
|
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
|
|
*/
|
|
*/
|
|
@@ -121,58 +143,60 @@ public class ZooKeeper {
|
|
Watcher.Event.EventType type, String path) {
|
|
Watcher.Event.EventType type, String path) {
|
|
Set<Watcher> result = new HashSet<Watcher>();
|
|
Set<Watcher> result = new HashSet<Watcher>();
|
|
|
|
|
|
- // clear the watches if we are not connected
|
|
|
|
|
|
+ switch (type) {
|
|
|
|
+ case None:
|
|
|
|
+ result.add(defaultWatcher);
|
|
|
|
+ for(Set<Watcher> ws: dataWatches.values()) {
|
|
|
|
+ result.addAll(ws);
|
|
|
|
+ }
|
|
|
|
+ for(Set<Watcher> ws: existWatches.values()) {
|
|
|
|
+ result.addAll(ws);
|
|
|
|
+ }
|
|
|
|
+ for(Set<Watcher> ws: childWatches.values()) {
|
|
|
|
+ result.addAll(ws);
|
|
|
|
+ }
|
|
|
|
|
|
- if (state != Watcher.Event.KeeperState.SyncConnected) {
|
|
|
|
- synchronized (dataWatches) {
|
|
|
|
- for (Set<Watcher> watchers : dataWatches.values()) {
|
|
|
|
- for (Watcher watcher : watchers) {
|
|
|
|
- result.add(watcher);
|
|
|
|
- }
|
|
|
|
|
|
+ // clear the watches if auto watch reset is not enabled
|
|
|
|
+ if (ClientCnxn.disableAutoWatchReset &&
|
|
|
|
+ state != Watcher.Event.KeeperState.SyncConnected)
|
|
|
|
+ {
|
|
|
|
+ synchronized(dataWatches) {
|
|
|
|
+ dataWatches.clear();
|
|
}
|
|
}
|
|
- dataWatches.clear();
|
|
|
|
- }
|
|
|
|
- synchronized (childWatches) {
|
|
|
|
- for (Set<Watcher> watchers : childWatches.values()) {
|
|
|
|
- for (Watcher watcher : watchers) {
|
|
|
|
- result.add(watcher);
|
|
|
|
- }
|
|
|
|
|
|
+ synchronized(existWatches) {
|
|
|
|
+ existWatches.clear();
|
|
|
|
+ }
|
|
|
|
+ synchronized(childWatches) {
|
|
|
|
+ childWatches.clear();
|
|
}
|
|
}
|
|
- childWatches.clear();
|
|
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- Set<Watcher> watchers = null;
|
|
|
|
-
|
|
|
|
- switch (type) {
|
|
|
|
- case None:
|
|
|
|
- result.add(defaultWatcher);
|
|
|
|
return result;
|
|
return result;
|
|
case NodeDataChanged:
|
|
case NodeDataChanged:
|
|
case NodeCreated:
|
|
case NodeCreated:
|
|
synchronized (dataWatches) {
|
|
synchronized (dataWatches) {
|
|
- watchers = dataWatches.remove(path);
|
|
|
|
|
|
+ addTo(dataWatches.remove(path), result);
|
|
|
|
+ }
|
|
|
|
+ synchronized (existWatches) {
|
|
|
|
+ addTo(existWatches.remove(path), result);
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case NodeChildrenChanged:
|
|
case NodeChildrenChanged:
|
|
synchronized (childWatches) {
|
|
synchronized (childWatches) {
|
|
- watchers = childWatches.remove(path);
|
|
|
|
|
|
+ addTo(childWatches.remove(path), result);
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case NodeDeleted:
|
|
case NodeDeleted:
|
|
synchronized (dataWatches) {
|
|
synchronized (dataWatches) {
|
|
- watchers = dataWatches.remove(path);
|
|
|
|
|
|
+ addTo(dataWatches.remove(path), result);
|
|
}
|
|
}
|
|
- Set<Watcher> cwatches;
|
|
|
|
- synchronized (childWatches) {
|
|
|
|
- cwatches = childWatches.remove(path);
|
|
|
|
|
|
+ // XXX This shouldn't be needed, but just in case
|
|
|
|
+ synchronized (existWatches) {
|
|
|
|
+ addTo(existWatches.remove(path), result);
|
|
|
|
+ LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
|
|
}
|
|
}
|
|
- if (cwatches != null) {
|
|
|
|
- if (watchers == null) {
|
|
|
|
- watchers = cwatches;
|
|
|
|
- } else {
|
|
|
|
- watchers.addAll(cwatches);
|
|
|
|
- }
|
|
|
|
|
|
+ synchronized (childWatches) {
|
|
|
|
+ addTo(childWatches.remove(path), result);
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
default:
|
|
default:
|
|
@@ -182,26 +206,24 @@ public class ZooKeeper {
|
|
throw new RuntimeException(msg);
|
|
throw new RuntimeException(msg);
|
|
}
|
|
}
|
|
|
|
|
|
- result.addAll(watchers);
|
|
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Register a watcher for a particular path.
|
|
* Register a watcher for a particular path.
|
|
*/
|
|
*/
|
|
- class WatchRegistration {
|
|
|
|
- private Map<String, Set<Watcher>> watches;
|
|
|
|
|
|
+ abstract class WatchRegistration {
|
|
private Watcher watcher;
|
|
private Watcher watcher;
|
|
private String path;
|
|
private String path;
|
|
- public WatchRegistration(Map<String, Set<Watcher>> watches,
|
|
|
|
- Watcher watcher, String path)
|
|
|
|
|
|
+ public WatchRegistration(Watcher watcher, String path)
|
|
{
|
|
{
|
|
- this.watches = watches;
|
|
|
|
this.watcher = watcher;
|
|
this.watcher = watcher;
|
|
this.path = path;
|
|
this.path = path;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ abstract protected Map<String, Set<Watcher>> getWatches(int rc);
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Register the watcher with the set of watches on path.
|
|
* Register the watcher with the set of watches on path.
|
|
* @param rc the result code of the operation that attempted to
|
|
* @param rc the result code of the operation that attempted to
|
|
@@ -209,6 +231,7 @@ public class ZooKeeper {
|
|
*/
|
|
*/
|
|
public void register(int rc) {
|
|
public void register(int rc) {
|
|
if (shouldAddWatch(rc)) {
|
|
if (shouldAddWatch(rc)) {
|
|
|
|
+ Map<String, Set<Watcher>> watches = getWatches(rc);
|
|
synchronized(watches) {
|
|
synchronized(watches) {
|
|
Set<Watcher> watchers = watches.get(path);
|
|
Set<Watcher> watchers = watches.get(path);
|
|
if (watchers == null) {
|
|
if (watchers == null) {
|
|
@@ -234,17 +257,43 @@ public class ZooKeeper {
|
|
* even in the case where NONODE result code is returned.
|
|
* even in the case where NONODE result code is returned.
|
|
*/
|
|
*/
|
|
class ExistsWatchRegistration extends WatchRegistration {
|
|
class ExistsWatchRegistration extends WatchRegistration {
|
|
- public ExistsWatchRegistration(Map<String, Set<Watcher>> watches,
|
|
|
|
- Watcher watcher, String path)
|
|
|
|
- {
|
|
|
|
- super(watches, watcher, path);
|
|
|
|
|
|
+ public ExistsWatchRegistration(Watcher watcher, String path) {
|
|
|
|
+ super(watcher, path);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected Map<String, Set<Watcher>> getWatches(int rc) {
|
|
|
|
+ return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
|
|
}
|
|
}
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
protected boolean shouldAddWatch(int rc) {
|
|
protected boolean shouldAddWatch(int rc) {
|
|
return rc == 0 || rc == KeeperException.Code.NoNode;
|
|
return rc == 0 || rc == KeeperException.Code.NoNode;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ class DataWatchRegistration extends WatchRegistration {
|
|
|
|
+ public DataWatchRegistration(Watcher watcher, String path) {
|
|
|
|
+ super(watcher, path);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected Map<String, Set<Watcher>> getWatches(int rc) {
|
|
|
|
+ return watchManager.dataWatches;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ class ChildWatchRegistration extends WatchRegistration {
|
|
|
|
+ public ChildWatchRegistration(Watcher watcher, String path) {
|
|
|
|
+ super(watcher, path);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected Map<String, Set<Watcher>> getWatches(int rc) {
|
|
|
|
+ return watchManager.childWatches;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public enum States {
|
|
public enum States {
|
|
CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED;
|
|
CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED;
|
|
|
|
|
|
@@ -534,8 +583,7 @@ public class ZooKeeper {
|
|
SetDataResponse response = new SetDataResponse();
|
|
SetDataResponse response = new SetDataResponse();
|
|
WatchRegistration wcb = null;
|
|
WatchRegistration wcb = null;
|
|
if (watcher != null) {
|
|
if (watcher != null) {
|
|
- wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
|
|
|
|
- path);
|
|
|
|
|
|
+ wcb = new ExistsWatchRegistration(watcher, path);
|
|
}
|
|
}
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
if (r.getErr() != 0) {
|
|
if (r.getErr() != 0) {
|
|
@@ -589,8 +637,7 @@ public class ZooKeeper {
|
|
SetDataResponse response = new SetDataResponse();
|
|
SetDataResponse response = new SetDataResponse();
|
|
WatchRegistration wcb = null;
|
|
WatchRegistration wcb = null;
|
|
if (watcher != null) {
|
|
if (watcher != null) {
|
|
- wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
|
|
|
|
- path);
|
|
|
|
|
|
+ wcb = new ExistsWatchRegistration(watcher, path);
|
|
}
|
|
}
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
ctx, wcb);
|
|
ctx, wcb);
|
|
@@ -634,8 +681,7 @@ public class ZooKeeper {
|
|
GetDataResponse response = new GetDataResponse();
|
|
GetDataResponse response = new GetDataResponse();
|
|
WatchRegistration wcb = null;
|
|
WatchRegistration wcb = null;
|
|
if (watcher != null) {
|
|
if (watcher != null) {
|
|
- wcb = new WatchRegistration(watchManager.dataWatches, watcher,
|
|
|
|
- path);
|
|
|
|
|
|
+ wcb = new DataWatchRegistration(watcher, path);
|
|
}
|
|
}
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
if (r.getErr() != 0) {
|
|
if (r.getErr() != 0) {
|
|
@@ -685,8 +731,7 @@ public class ZooKeeper {
|
|
GetDataResponse response = new GetDataResponse();
|
|
GetDataResponse response = new GetDataResponse();
|
|
WatchRegistration wcb = null;
|
|
WatchRegistration wcb = null;
|
|
if (watcher != null) {
|
|
if (watcher != null) {
|
|
- wcb = new WatchRegistration(watchManager.dataWatches, watcher,
|
|
|
|
- path);
|
|
|
|
|
|
+ wcb = new DataWatchRegistration(watcher, path);
|
|
}
|
|
}
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
ctx, wcb);
|
|
ctx, wcb);
|
|
@@ -899,8 +944,7 @@ public class ZooKeeper {
|
|
GetChildrenResponse response = new GetChildrenResponse();
|
|
GetChildrenResponse response = new GetChildrenResponse();
|
|
WatchRegistration wcb = null;
|
|
WatchRegistration wcb = null;
|
|
if (watcher != null) {
|
|
if (watcher != null) {
|
|
- wcb = new WatchRegistration(watchManager.childWatches, watcher,
|
|
|
|
- path);
|
|
|
|
|
|
+ wcb = new ChildWatchRegistration(watcher, path);
|
|
}
|
|
}
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
|
|
if (r.getErr() != 0) {
|
|
if (r.getErr() != 0) {
|
|
@@ -950,8 +994,7 @@ public class ZooKeeper {
|
|
GetChildrenResponse response = new GetChildrenResponse();
|
|
GetChildrenResponse response = new GetChildrenResponse();
|
|
WatchRegistration wcb = null;
|
|
WatchRegistration wcb = null;
|
|
if (watcher != null) {
|
|
if (watcher != null) {
|
|
- wcb = new WatchRegistration(watchManager.childWatches, watcher,
|
|
|
|
- path);
|
|
|
|
|
|
+ wcb = new ChildWatchRegistration(watcher, path);
|
|
}
|
|
}
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
|
|
ctx, wcb);
|
|
ctx, wcb);
|