|
@@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.SocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -40,10 +39,7 @@ import org.apache.zookeeper.AsyncCallback.MultiCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.StatCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.StringCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.VoidCallback;
|
|
|
-import org.apache.zookeeper.KeeperException.Code;
|
|
|
-import org.apache.zookeeper.KeeperException.NoWatcherException;
|
|
|
import org.apache.zookeeper.OpResult.ErrorResult;
|
|
|
-import org.apache.zookeeper.Watcher.Event.EventType;
|
|
|
import org.apache.zookeeper.Watcher.WatcherType;
|
|
|
import org.apache.zookeeper.client.ConnectStringParser;
|
|
|
import org.apache.zookeeper.client.HostProvider;
|
|
@@ -85,7 +81,6 @@ import org.apache.zookeeper.proto.SyncRequest;
|
|
|
import org.apache.zookeeper.proto.SyncResponse;
|
|
|
import org.apache.zookeeper.server.DataTree;
|
|
|
import org.apache.zookeeper.server.EphemeralType;
|
|
|
-import org.apache.zookeeper.server.watch.PathParentIterator;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -231,8 +226,6 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
return cnxn.zooKeeperSaslClient;
|
|
|
}
|
|
|
|
|
|
- protected final ZKWatchManager watchManager;
|
|
|
-
|
|
|
private final ZKClientConfig clientConfig;
|
|
|
|
|
|
public ZKClientConfig getClientConfig() {
|
|
@@ -240,407 +233,37 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
}
|
|
|
|
|
|
protected List<String> getDataWatches() {
|
|
|
- synchronized (watchManager.dataWatches) {
|
|
|
- List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
|
|
|
- return rc;
|
|
|
- }
|
|
|
+ return getWatchManager().getDataWatchList();
|
|
|
}
|
|
|
+
|
|
|
protected List<String> getExistWatches() {
|
|
|
- synchronized (watchManager.existWatches) {
|
|
|
- List<String> rc = new ArrayList<String>(watchManager.existWatches.keySet());
|
|
|
- return rc;
|
|
|
- }
|
|
|
+ return getWatchManager().getExistWatchList();
|
|
|
}
|
|
|
+
|
|
|
protected List<String> getChildWatches() {
|
|
|
- synchronized (watchManager.childWatches) {
|
|
|
- List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
|
|
|
- return rc;
|
|
|
- }
|
|
|
+ return getWatchManager().getChildWatchList();
|
|
|
}
|
|
|
+
|
|
|
protected List<String> getPersistentWatches() {
|
|
|
- synchronized (watchManager.persistentWatches) {
|
|
|
- List<String> rc = new ArrayList<String>(watchManager.persistentWatches.keySet());
|
|
|
- return rc;
|
|
|
- }
|
|
|
+ return getWatchManager().getPersistentWatchList();
|
|
|
}
|
|
|
+
|
|
|
protected List<String> getPersistentRecursiveWatches() {
|
|
|
- synchronized (watchManager.persistentRecursiveWatches) {
|
|
|
- List<String> rc = new ArrayList<String>(watchManager.persistentRecursiveWatches.keySet());
|
|
|
- return rc;
|
|
|
- }
|
|
|
+ return getWatchManager().getPersistentRecursiveWatchList();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Manage watchers and handle events generated by the ClientCnxn object.
|
|
|
- *
|
|
|
- * We are implementing this as a nested class of ZooKeeper so that
|
|
|
- * the public methods will not be exposed as part of the ZooKeeper client
|
|
|
- * API.
|
|
|
- */
|
|
|
- static class ZKWatchManager implements ClientWatchManager {
|
|
|
-
|
|
|
- private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
|
|
|
- private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
|
|
|
- private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
|
|
|
- private final Map<String, Set<Watcher>> persistentWatches = new HashMap<String, Set<Watcher>>();
|
|
|
- private final Map<String, Set<Watcher>> persistentRecursiveWatches = new HashMap<String, Set<Watcher>>();
|
|
|
- private boolean disableAutoWatchReset;
|
|
|
-
|
|
|
- ZKWatchManager(boolean disableAutoWatchReset) {
|
|
|
- this.disableAutoWatchReset = disableAutoWatchReset;
|
|
|
- }
|
|
|
-
|
|
|
- protected volatile Watcher defaultWatcher;
|
|
|
-
|
|
|
- private void addTo(Set<Watcher> from, Set<Watcher> to) {
|
|
|
- if (from != null) {
|
|
|
- to.addAll(from);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public Map<EventType, Set<Watcher>> removeWatcher(
|
|
|
- String clientPath,
|
|
|
- Watcher watcher,
|
|
|
- WatcherType watcherType,
|
|
|
- boolean local,
|
|
|
- int rc) throws KeeperException {
|
|
|
- // Validate the provided znode path contains the given watcher of
|
|
|
- // watcherType
|
|
|
- containsWatcher(clientPath, watcher, watcherType);
|
|
|
-
|
|
|
- Map<EventType, Set<Watcher>> removedWatchers = new HashMap<>();
|
|
|
- HashSet<Watcher> childWatchersToRem = new HashSet<>();
|
|
|
- removedWatchers.put(EventType.ChildWatchRemoved, childWatchersToRem);
|
|
|
- HashSet<Watcher> dataWatchersToRem = new HashSet<>();
|
|
|
- removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
|
|
|
- HashSet<Watcher> persistentWatchersToRem = new HashSet<>();
|
|
|
- removedWatchers.put(EventType.PersistentWatchRemoved, persistentWatchersToRem);
|
|
|
- boolean removedWatcher = false;
|
|
|
- switch (watcherType) {
|
|
|
- case Children: {
|
|
|
- synchronized (childWatches) {
|
|
|
- removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem);
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- case Data: {
|
|
|
- synchronized (dataWatches) {
|
|
|
- removedWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (existWatches) {
|
|
|
- boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem);
|
|
|
- removedWatcher |= removedDataWatcher;
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- case Any: {
|
|
|
- synchronized (childWatches) {
|
|
|
- removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (dataWatches) {
|
|
|
- boolean removedDataWatcher = removeWatches(dataWatches, watcher, clientPath, local, rc, dataWatchersToRem);
|
|
|
- removedWatcher |= removedDataWatcher;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (existWatches) {
|
|
|
- boolean removedDataWatcher = removeWatches(existWatches, watcher, clientPath, local, rc, dataWatchersToRem);
|
|
|
- removedWatcher |= removedDataWatcher;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentWatches) {
|
|
|
- boolean removedPersistentWatcher = removeWatches(persistentWatches,
|
|
|
- watcher, clientPath, local, rc, persistentWatchersToRem);
|
|
|
- removedWatcher |= removedPersistentWatcher;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentRecursiveWatches) {
|
|
|
- boolean removedPersistentRecursiveWatcher = removeWatches(persistentRecursiveWatches,
|
|
|
- watcher, clientPath, local, rc, persistentWatchersToRem);
|
|
|
- removedWatcher |= removedPersistentRecursiveWatcher;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // Watcher function doesn't exists for the specified params
|
|
|
- if (!removedWatcher) {
|
|
|
- throw new KeeperException.NoWatcherException(clientPath);
|
|
|
- }
|
|
|
- return removedWatchers;
|
|
|
- }
|
|
|
-
|
|
|
- private boolean contains(String path, Watcher watcherObj, Map<String, Set<Watcher>> pathVsWatchers) {
|
|
|
- boolean watcherExists = true;
|
|
|
- if (pathVsWatchers == null || pathVsWatchers.size() == 0) {
|
|
|
- watcherExists = false;
|
|
|
- } else {
|
|
|
- Set<Watcher> watchers = pathVsWatchers.get(path);
|
|
|
- if (watchers == null) {
|
|
|
- watcherExists = false;
|
|
|
- } else if (watcherObj == null) {
|
|
|
- watcherExists = watchers.size() > 0;
|
|
|
- } else {
|
|
|
- watcherExists = watchers.contains(watcherObj);
|
|
|
- }
|
|
|
- }
|
|
|
- return watcherExists;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Validate the provided znode path contains the given watcher and
|
|
|
- * watcherType
|
|
|
- *
|
|
|
- * @param path
|
|
|
- * - client path
|
|
|
- * @param watcher
|
|
|
- * - watcher object reference
|
|
|
- * @param watcherType
|
|
|
- * - type of the watcher
|
|
|
- * @throws NoWatcherException
|
|
|
- */
|
|
|
- void containsWatcher(String path, Watcher watcher, WatcherType watcherType) throws NoWatcherException {
|
|
|
- boolean containsWatcher = false;
|
|
|
- switch (watcherType) {
|
|
|
- case Children: {
|
|
|
- synchronized (childWatches) {
|
|
|
- containsWatcher = contains(path, watcher, childWatches);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher,
|
|
|
- persistentWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentRecursiveWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher,
|
|
|
- persistentRecursiveWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- case Data: {
|
|
|
- synchronized (dataWatches) {
|
|
|
- containsWatcher = contains(path, watcher, dataWatches);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (existWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher, existWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher,
|
|
|
- persistentWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentRecursiveWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher,
|
|
|
- persistentRecursiveWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- case Any: {
|
|
|
- synchronized (childWatches) {
|
|
|
- containsWatcher = contains(path, watcher, childWatches);
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (dataWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher, dataWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (existWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher, existWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher,
|
|
|
- persistentWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentRecursiveWatches) {
|
|
|
- boolean contains_temp = contains(path, watcher,
|
|
|
- persistentRecursiveWatches);
|
|
|
- containsWatcher |= contains_temp;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // Watcher function doesn't exists for the specified params
|
|
|
- if (!containsWatcher) {
|
|
|
- throw new KeeperException.NoWatcherException(path);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected boolean removeWatches(
|
|
|
- Map<String, Set<Watcher>> pathVsWatcher,
|
|
|
- Watcher watcher,
|
|
|
- String path,
|
|
|
- boolean local,
|
|
|
- int rc,
|
|
|
- Set<Watcher> removedWatchers) throws KeeperException {
|
|
|
- if (!local && rc != Code.OK.intValue()) {
|
|
|
- throw KeeperException.create(KeeperException.Code.get(rc), path);
|
|
|
- }
|
|
|
- boolean success = false;
|
|
|
- // When local flag is true, remove watchers for the given path
|
|
|
- // irrespective of rc. Otherwise shouldn't remove watchers locally
|
|
|
- // when sees failure from server.
|
|
|
- if (rc == Code.OK.intValue() || (local && rc != Code.OK.intValue())) {
|
|
|
- // Remove all the watchers for the given path
|
|
|
- if (watcher == null) {
|
|
|
- Set<Watcher> pathWatchers = pathVsWatcher.remove(path);
|
|
|
- if (pathWatchers != null) {
|
|
|
- // found path watchers
|
|
|
- removedWatchers.addAll(pathWatchers);
|
|
|
- success = true;
|
|
|
- }
|
|
|
- } else {
|
|
|
- Set<Watcher> watchers = pathVsWatcher.get(path);
|
|
|
- if (watchers != null) {
|
|
|
- if (watchers.remove(watcher)) {
|
|
|
- // found path watcher
|
|
|
- removedWatchers.add(watcher);
|
|
|
- // cleanup <path vs watchlist>
|
|
|
- if (watchers.size() <= 0) {
|
|
|
- pathVsWatcher.remove(path);
|
|
|
- }
|
|
|
- success = true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return success;
|
|
|
- }
|
|
|
-
|
|
|
- /* (non-Javadoc)
|
|
|
- * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
|
|
|
- * Event.EventType, java.lang.String)
|
|
|
- */
|
|
|
- @Override
|
|
|
- public Set<Watcher> materialize(
|
|
|
- Watcher.Event.KeeperState state,
|
|
|
- Watcher.Event.EventType type,
|
|
|
- String clientPath
|
|
|
- ) {
|
|
|
- final Set<Watcher> result = new HashSet<>();
|
|
|
-
|
|
|
- switch (type) {
|
|
|
- case None:
|
|
|
- if (defaultWatcher != null) {
|
|
|
- result.add(defaultWatcher);
|
|
|
- }
|
|
|
-
|
|
|
- boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
|
|
|
- synchronized (dataWatches) {
|
|
|
- for (Set<Watcher> ws : dataWatches.values()) {
|
|
|
- result.addAll(ws);
|
|
|
- }
|
|
|
- if (clear) {
|
|
|
- dataWatches.clear();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (existWatches) {
|
|
|
- for (Set<Watcher> ws : existWatches.values()) {
|
|
|
- result.addAll(ws);
|
|
|
- }
|
|
|
- if (clear) {
|
|
|
- existWatches.clear();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (childWatches) {
|
|
|
- for (Set<Watcher> ws : childWatches.values()) {
|
|
|
- result.addAll(ws);
|
|
|
- }
|
|
|
- if (clear) {
|
|
|
- childWatches.clear();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentWatches) {
|
|
|
- for (Set<Watcher> ws: persistentWatches.values()) {
|
|
|
- result.addAll(ws);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (persistentRecursiveWatches) {
|
|
|
- for (Set<Watcher> ws: persistentRecursiveWatches.values()) {
|
|
|
- result.addAll(ws);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return result;
|
|
|
- case NodeDataChanged:
|
|
|
- case NodeCreated:
|
|
|
- synchronized (dataWatches) {
|
|
|
- addTo(dataWatches.remove(clientPath), result);
|
|
|
- }
|
|
|
- synchronized (existWatches) {
|
|
|
- addTo(existWatches.remove(clientPath), result);
|
|
|
- }
|
|
|
- addPersistentWatches(clientPath, result);
|
|
|
- break;
|
|
|
- case NodeChildrenChanged:
|
|
|
- synchronized (childWatches) {
|
|
|
- addTo(childWatches.remove(clientPath), result);
|
|
|
- }
|
|
|
- addPersistentWatches(clientPath, result);
|
|
|
- break;
|
|
|
- case NodeDeleted:
|
|
|
- synchronized (dataWatches) {
|
|
|
- addTo(dataWatches.remove(clientPath), result);
|
|
|
- }
|
|
|
- // TODO This shouldn't be needed, but just in case
|
|
|
- synchronized (existWatches) {
|
|
|
- Set<Watcher> list = existWatches.remove(clientPath);
|
|
|
- if (list != null) {
|
|
|
- addTo(list, result);
|
|
|
- LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
|
|
|
- }
|
|
|
- }
|
|
|
- synchronized (childWatches) {
|
|
|
- addTo(childWatches.remove(clientPath), result);
|
|
|
- }
|
|
|
- addPersistentWatches(clientPath, result);
|
|
|
- break;
|
|
|
- default:
|
|
|
- String errorMsg = String.format(
|
|
|
- "Unhandled watch event type %s with state %s on path %s",
|
|
|
- type,
|
|
|
- state,
|
|
|
- clientPath);
|
|
|
- LOG.error(errorMsg);
|
|
|
- throw new RuntimeException(errorMsg);
|
|
|
- }
|
|
|
-
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- private void addPersistentWatches(String clientPath, Set<Watcher> result) {
|
|
|
- synchronized (persistentWatches) {
|
|
|
- addTo(persistentWatches.get(clientPath), result);
|
|
|
- }
|
|
|
- synchronized (persistentRecursiveWatches) {
|
|
|
- for (String path : PathParentIterator.forAll(clientPath).asIterable()) {
|
|
|
- addTo(persistentRecursiveWatches.get(path), result);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ ZKWatchManager getWatchManager() {
|
|
|
+ return cnxn.getWatcherManager();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Register a watcher for a particular path.
|
|
|
*/
|
|
|
- public abstract class WatchRegistration {
|
|
|
+ public abstract static class WatchRegistration {
|
|
|
|
|
|
private Watcher watcher;
|
|
|
private String clientPath;
|
|
|
+
|
|
|
public WatchRegistration(Watcher watcher, String clientPath) {
|
|
|
this.watcher = watcher;
|
|
|
this.clientPath = clientPath;
|
|
@@ -689,7 +312,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
|
|
|
@Override
|
|
|
protected Map<String, Set<Watcher>> getWatches(int rc) {
|
|
|
- return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
|
|
|
+ return rc == 0 ? getWatchManager().getDataWatches() : getWatchManager().getExistWatches();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -707,7 +330,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
|
|
|
@Override
|
|
|
protected Map<String, Set<Watcher>> getWatches(int rc) {
|
|
|
- return watchManager.dataWatches;
|
|
|
+ return getWatchManager().getDataWatches();
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -720,7 +343,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
|
|
|
@Override
|
|
|
protected Map<String, Set<Watcher>> getWatches(int rc) {
|
|
|
- return watchManager.childWatches;
|
|
|
+ return getWatchManager().getChildWatches();
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -737,9 +360,9 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
protected Map<String, Set<Watcher>> getWatches(int rc) {
|
|
|
switch (mode) {
|
|
|
case PERSISTENT:
|
|
|
- return watchManager.persistentWatches;
|
|
|
+ return getWatchManager().getPersistentWatches();
|
|
|
case PERSISTENT_RECURSIVE:
|
|
|
- return watchManager.persistentRecursiveWatches;
|
|
|
+ return getWatchManager().getPersistentRecursiveWatches();
|
|
|
}
|
|
|
throw new IllegalArgumentException("Mode not supported: " + mode);
|
|
|
}
|
|
@@ -989,7 +612,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
* connects to one in read-only mode, i.e. read requests are
|
|
|
* allowed while write requests are not. It continues seeking for
|
|
|
* majority in the background.
|
|
|
- * @param aHostProvider
|
|
|
+ * @param hostProvider
|
|
|
* use this as HostProvider to enable custom behaviour.
|
|
|
* @param clientConfig
|
|
|
* (added in 3.5.2) passing this conf object gives each client the flexibility of
|
|
@@ -1004,49 +627,45 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
int sessionTimeout,
|
|
|
Watcher watcher,
|
|
|
boolean canBeReadOnly,
|
|
|
- HostProvider aHostProvider,
|
|
|
- ZKClientConfig clientConfig) throws IOException {
|
|
|
+ HostProvider hostProvider,
|
|
|
+ ZKClientConfig clientConfig
|
|
|
+ ) throws IOException {
|
|
|
LOG.info(
|
|
|
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
|
|
|
connectString,
|
|
|
sessionTimeout,
|
|
|
watcher);
|
|
|
|
|
|
- if (clientConfig == null) {
|
|
|
- clientConfig = new ZKClientConfig();
|
|
|
- }
|
|
|
- this.clientConfig = clientConfig;
|
|
|
- watchManager = defaultWatchManager();
|
|
|
- watchManager.defaultWatcher = watcher;
|
|
|
+ this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
|
|
|
+ this.hostProvider = hostProvider;
|
|
|
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
|
|
|
- hostProvider = aHostProvider;
|
|
|
|
|
|
cnxn = createConnection(
|
|
|
connectStringParser.getChrootPath(),
|
|
|
hostProvider,
|
|
|
sessionTimeout,
|
|
|
- this,
|
|
|
- watchManager,
|
|
|
+ this.clientConfig,
|
|
|
+ watcher,
|
|
|
getClientCnxnSocket(),
|
|
|
canBeReadOnly);
|
|
|
cnxn.start();
|
|
|
}
|
|
|
|
|
|
- // @VisibleForTesting
|
|
|
- protected ClientCnxn createConnection(
|
|
|
+ ClientCnxn createConnection(
|
|
|
String chrootPath,
|
|
|
HostProvider hostProvider,
|
|
|
int sessionTimeout,
|
|
|
- ZooKeeper zooKeeper,
|
|
|
- ClientWatchManager watcher,
|
|
|
+ ZKClientConfig clientConfig,
|
|
|
+ Watcher defaultWatcher,
|
|
|
ClientCnxnSocket clientCnxnSocket,
|
|
|
- boolean canBeReadOnly) throws IOException {
|
|
|
+ boolean canBeReadOnly
|
|
|
+ ) throws IOException {
|
|
|
return new ClientCnxn(
|
|
|
chrootPath,
|
|
|
hostProvider,
|
|
|
sessionTimeout,
|
|
|
- this,
|
|
|
- watchManager,
|
|
|
+ clientConfig,
|
|
|
+ defaultWatcher,
|
|
|
clientCnxnSocket,
|
|
|
canBeReadOnly);
|
|
|
}
|
|
@@ -1383,7 +1002,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
* connects to one in read-only mode, i.e. read requests are
|
|
|
* allowed while write requests are not. It continues seeking for
|
|
|
* majority in the background.
|
|
|
- * @param aHostProvider
|
|
|
+ * @param hostProvider
|
|
|
* use this as HostProvider to enable custom behaviour.
|
|
|
* @param clientConfig
|
|
|
* (added in 3.5.2) passing this conf object gives each client the flexibility of
|
|
@@ -1400,7 +1019,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
long sessionId,
|
|
|
byte[] sessionPasswd,
|
|
|
boolean canBeReadOnly,
|
|
|
- HostProvider aHostProvider,
|
|
|
+ HostProvider hostProvider,
|
|
|
ZKClientConfig clientConfig) throws IOException {
|
|
|
LOG.info(
|
|
|
"Initiating client connection, connectString={} "
|
|
@@ -1411,22 +1030,16 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
Long.toHexString(sessionId),
|
|
|
(sessionPasswd == null ? "<null>" : "<hidden>"));
|
|
|
|
|
|
- if (clientConfig == null) {
|
|
|
- clientConfig = new ZKClientConfig();
|
|
|
- }
|
|
|
- this.clientConfig = clientConfig;
|
|
|
- watchManager = defaultWatchManager();
|
|
|
- watchManager.defaultWatcher = watcher;
|
|
|
-
|
|
|
+ this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
|
|
|
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
|
|
|
- hostProvider = aHostProvider;
|
|
|
+ this.hostProvider = hostProvider;
|
|
|
|
|
|
cnxn = new ClientCnxn(
|
|
|
connectStringParser.getChrootPath(),
|
|
|
hostProvider,
|
|
|
sessionTimeout,
|
|
|
- this,
|
|
|
- watchManager,
|
|
|
+ this.clientConfig,
|
|
|
+ watcher,
|
|
|
getClientCnxnSocket(),
|
|
|
sessionId,
|
|
|
sessionPasswd,
|
|
@@ -1523,11 +1136,6 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
return new ZooKeeperTestable(cnxn);
|
|
|
}
|
|
|
|
|
|
- /* Useful for testing watch handling behavior */
|
|
|
- protected ZKWatchManager defaultWatchManager() {
|
|
|
- return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* The session id for this ZooKeeper client instance. The value returned is
|
|
|
* not valid until the client connects to a server and may change after a
|
|
@@ -1582,11 +1190,9 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
/**
|
|
|
* Specify the default watcher for the connection (overrides the one
|
|
|
* specified during construction).
|
|
|
- *
|
|
|
- * @param watcher
|
|
|
*/
|
|
|
public synchronized void register(Watcher watcher) {
|
|
|
- watchManager.defaultWatcher = watcher;
|
|
|
+ getWatchManager().setDefaultWatcher(watcher);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3213,9 +2819,11 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
* error code.
|
|
|
* @since 3.6.0
|
|
|
*/
|
|
|
- public void addWatch(String basePath, AddWatchMode mode)
|
|
|
- throws KeeperException, InterruptedException {
|
|
|
- addWatch(basePath, watchManager.defaultWatcher, mode);
|
|
|
+ public void addWatch(
|
|
|
+ String basePath,
|
|
|
+ AddWatchMode mode
|
|
|
+ ) throws KeeperException, InterruptedException {
|
|
|
+ addWatch(basePath, getWatchManager().getDefaultWatcher(), mode);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3229,8 +2837,12 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
* @throws IllegalArgumentException if an invalid path is specified
|
|
|
* @since 3.6.0
|
|
|
*/
|
|
|
- public void addWatch(String basePath, Watcher watcher, AddWatchMode mode,
|
|
|
- VoidCallback cb, Object ctx) {
|
|
|
+ public void addWatch(
|
|
|
+ String basePath,
|
|
|
+ Watcher watcher, AddWatchMode mode,
|
|
|
+ VoidCallback cb,
|
|
|
+ Object ctx
|
|
|
+ ) {
|
|
|
PathUtils.validatePath(basePath);
|
|
|
String serverPath = prependChroot(basePath);
|
|
|
|
|
@@ -3251,9 +2863,8 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
* @throws IllegalArgumentException if an invalid path is specified
|
|
|
* @since 3.6.0
|
|
|
*/
|
|
|
- public void addWatch(String basePath, AddWatchMode mode,
|
|
|
- VoidCallback cb, Object ctx) {
|
|
|
- addWatch(basePath, watchManager.defaultWatcher, mode, cb, ctx);
|
|
|
+ public void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object ctx) {
|
|
|
+ addWatch(basePath, getWatchManager().getDefaultWatcher(), mode, cb, ctx);
|
|
|
}
|
|
|
|
|
|
private void validateWatcher(Watcher watcher) {
|
|
@@ -3271,7 +2882,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
PathUtils.validatePath(path);
|
|
|
final String clientPath = path;
|
|
|
final String serverPath = prependChroot(clientPath);
|
|
|
- WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager);
|
|
|
+ WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, getWatchManager());
|
|
|
|
|
|
RequestHeader h = new RequestHeader();
|
|
|
h.setType(opCode);
|
|
@@ -3294,7 +2905,7 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
PathUtils.validatePath(path);
|
|
|
final String clientPath = path;
|
|
|
final String serverPath = prependChroot(clientPath);
|
|
|
- WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, watchManager);
|
|
|
+ WatchDeregistration wcb = new WatchDeregistration(clientPath, watcher, watcherType, local, getWatchManager());
|
|
|
|
|
|
RequestHeader h = new RequestHeader();
|
|
|
h.setType(opCode);
|
|
@@ -3424,8 +3035,9 @@ public class ZooKeeper implements AutoCloseable {
|
|
|
*/
|
|
|
private Watcher getDefaultWatcher(boolean required) {
|
|
|
if (required) {
|
|
|
- if (watchManager.defaultWatcher != null) {
|
|
|
- return watchManager.defaultWatcher;
|
|
|
+ final Watcher defaultWatcher = getWatchManager().getDefaultWatcher();
|
|
|
+ if (defaultWatcher != null) {
|
|
|
+ return defaultWatcher;
|
|
|
} else {
|
|
|
throw new IllegalStateException("Default watcher is required, but it is null.");
|
|
|
}
|