|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.zookeeper.server.watch;
|
|
|
|
|
|
import java.io.PrintWriter;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
@@ -45,9 +46,9 @@ public class WatchManager implements IWatchManager {
|
|
|
|
|
|
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();
|
|
|
|
|
|
- private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();
|
|
|
+ private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();
|
|
|
|
|
|
- private final WatcherModeManager watcherModeManager = new WatcherModeManager();
|
|
|
+ private int recursiveWatchQty = 0;
|
|
|
|
|
|
@Override
|
|
|
public synchronized int size() {
|
|
@@ -84,25 +85,34 @@ public class WatchManager implements IWatchManager {
|
|
|
}
|
|
|
list.add(watcher);
|
|
|
|
|
|
- Set<String> paths = watch2Paths.get(watcher);
|
|
|
+ Map<String, WatchStats> paths = watch2Paths.get(watcher);
|
|
|
if (paths == null) {
|
|
|
// cnxns typically have many watches, so use default cap here
|
|
|
- paths = new HashSet<>();
|
|
|
+ paths = new HashMap<>();
|
|
|
watch2Paths.put(watcher, paths);
|
|
|
}
|
|
|
|
|
|
- watcherModeManager.setWatcherMode(watcher, path, watcherMode);
|
|
|
+ WatchStats stats = paths.getOrDefault(path, WatchStats.NONE);
|
|
|
+ WatchStats newStats = stats.addMode(watcherMode);
|
|
|
|
|
|
- return paths.add(path);
|
|
|
+ if (newStats != stats) {
|
|
|
+ paths.put(path, newStats);
|
|
|
+ if (watcherMode.isRecursive()) {
|
|
|
+ ++recursiveWatchQty;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public synchronized void removeWatcher(Watcher watcher) {
|
|
|
- Set<String> paths = watch2Paths.remove(watcher);
|
|
|
+ Map<String, WatchStats> paths = watch2Paths.remove(watcher);
|
|
|
if (paths == null) {
|
|
|
return;
|
|
|
}
|
|
|
- for (String p : paths) {
|
|
|
+ for (String p : paths.keySet()) {
|
|
|
Set<Watcher> list = watchTable.get(p);
|
|
|
if (list != null) {
|
|
|
list.remove(watcher);
|
|
@@ -110,7 +120,11 @@ public class WatchManager implements IWatchManager {
|
|
|
watchTable.remove(p);
|
|
|
}
|
|
|
}
|
|
|
- watcherModeManager.removeWatcher(watcher, p);
|
|
|
+ }
|
|
|
+ for (WatchStats stats : paths.values()) {
|
|
|
+ if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
|
|
|
+ --recursiveWatchQty;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -123,8 +137,8 @@ public class WatchManager implements IWatchManager {
|
|
|
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
|
|
|
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
|
|
|
Set<Watcher> watchers = new HashSet<>();
|
|
|
- PathParentIterator pathParentIterator = getPathParentIterator(path);
|
|
|
synchronized (this) {
|
|
|
+ PathParentIterator pathParentIterator = getPathParentIterator(path);
|
|
|
for (String localPath : pathParentIterator.asIterable()) {
|
|
|
Set<Watcher> thisWatchers = watchTable.get(localPath);
|
|
|
if (thisWatchers == null || thisWatchers.isEmpty()) {
|
|
@@ -133,20 +147,23 @@ public class WatchManager implements IWatchManager {
|
|
|
Iterator<Watcher> iterator = thisWatchers.iterator();
|
|
|
while (iterator.hasNext()) {
|
|
|
Watcher watcher = iterator.next();
|
|
|
- WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
|
|
|
- if (watcherMode.isRecursive()) {
|
|
|
- if (type != EventType.NodeChildrenChanged) {
|
|
|
- watchers.add(watcher);
|
|
|
- }
|
|
|
- } else if (!pathParentIterator.atParentPath()) {
|
|
|
+ Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
|
|
|
+ WatchStats stats = paths.get(localPath);
|
|
|
+ if (stats == null) {
|
|
|
+ LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!pathParentIterator.atParentPath()) {
|
|
|
watchers.add(watcher);
|
|
|
- if (!watcherMode.isPersistent()) {
|
|
|
+ WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
|
|
|
+ if (newStats == WatchStats.NONE) {
|
|
|
iterator.remove();
|
|
|
- Set<String> paths = watch2Paths.get(watcher);
|
|
|
- if (paths != null) {
|
|
|
- paths.remove(localPath);
|
|
|
- }
|
|
|
+ paths.remove(localPath);
|
|
|
+ } else if (newStats != stats) {
|
|
|
+ paths.put(localPath, newStats);
|
|
|
}
|
|
|
+ } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
|
|
|
+ watchers.add(watcher);
|
|
|
}
|
|
|
}
|
|
|
if (thisWatchers.isEmpty()) {
|
|
@@ -199,7 +216,7 @@ public class WatchManager implements IWatchManager {
|
|
|
sb.append(watch2Paths.size()).append(" connections watching ").append(watchTable.size()).append(" paths\n");
|
|
|
|
|
|
int total = 0;
|
|
|
- for (Set<String> paths : watch2Paths.values()) {
|
|
|
+ for (Map<String, WatchStats> paths : watch2Paths.values()) {
|
|
|
total += paths.size();
|
|
|
}
|
|
|
sb.append("Total watches:").append(total);
|
|
@@ -219,10 +236,10 @@ public class WatchManager implements IWatchManager {
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
|
|
|
+ for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
|
|
|
pwriter.print("0x");
|
|
|
pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId()));
|
|
|
- for (String path : e.getValue()) {
|
|
|
+ for (String path : e.getValue().keySet()) {
|
|
|
pwriter.print("\t");
|
|
|
pwriter.println(path);
|
|
|
}
|
|
@@ -232,31 +249,28 @@ public class WatchManager implements IWatchManager {
|
|
|
|
|
|
@Override
|
|
|
public synchronized boolean containsWatcher(String path, Watcher watcher) {
|
|
|
- WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, path);
|
|
|
- PathParentIterator pathParentIterator = getPathParentIterator(path);
|
|
|
- for (String localPath : pathParentIterator.asIterable()) {
|
|
|
- Set<Watcher> watchers = watchTable.get(localPath);
|
|
|
- if (!pathParentIterator.atParentPath()) {
|
|
|
- if (watchers != null) {
|
|
|
- return true; // at the leaf node, all watcher types match
|
|
|
- }
|
|
|
- }
|
|
|
- if (watcherMode.isRecursive()) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
+ Set<Watcher> list = watchTable.get(path);
|
|
|
+ return list != null && list.contains(watcher);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public synchronized boolean removeWatcher(String path, Watcher watcher) {
|
|
|
- Set<String> paths = watch2Paths.get(watcher);
|
|
|
- if (paths == null || !paths.remove(path)) {
|
|
|
+ Map<String, WatchStats> paths = watch2Paths.get(watcher);
|
|
|
+ if (paths == null) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ WatchStats stats = paths.remove(path);
|
|
|
+ if (stats == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
|
|
|
+ --recursiveWatchQty;
|
|
|
+ }
|
|
|
+
|
|
|
Set<Watcher> list = watchTable.get(path);
|
|
|
if (list == null || !list.remove(watcher)) {
|
|
|
+ LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher);
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -264,17 +278,20 @@ public class WatchManager implements IWatchManager {
|
|
|
watchTable.remove(path);
|
|
|
}
|
|
|
|
|
|
- watcherModeManager.removeWatcher(watcher, path);
|
|
|
-
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ // VisibleForTesting
|
|
|
+ Map<Watcher, Map<String, WatchStats>> getWatch2Paths() {
|
|
|
+ return watch2Paths;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public synchronized WatchesReport getWatches() {
|
|
|
Map<Long, Set<String>> id2paths = new HashMap<>();
|
|
|
- for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
|
|
|
+ for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
|
|
|
Long id = ((ServerCnxn) e.getKey()).getSessionId();
|
|
|
- Set<String> paths = new HashSet<>(e.getValue());
|
|
|
+ Set<String> paths = new HashSet<>(e.getValue().keySet());
|
|
|
id2paths.put(id, paths);
|
|
|
}
|
|
|
return new WatchesReport(id2paths);
|
|
@@ -296,7 +313,7 @@ public class WatchManager implements IWatchManager {
|
|
|
@Override
|
|
|
public synchronized WatchesSummary getWatchesSummary() {
|
|
|
int totalWatches = 0;
|
|
|
- for (Set<String> paths : watch2Paths.values()) {
|
|
|
+ for (Map<String, WatchStats> paths : watch2Paths.values()) {
|
|
|
totalWatches += paths.size();
|
|
|
}
|
|
|
return new WatchesSummary(watch2Paths.size(), watchTable.size(), totalWatches);
|
|
@@ -305,13 +322,13 @@ public class WatchManager implements IWatchManager {
|
|
|
@Override
|
|
|
public void shutdown() { /* do nothing */ }
|
|
|
|
|
|
- @Override
|
|
|
- public int getRecursiveWatchQty() {
|
|
|
- return watcherModeManager.getRecursiveQty();
|
|
|
+ // VisibleForTesting
|
|
|
+ synchronized int getRecursiveWatchQty() {
|
|
|
+ return recursiveWatchQty;
|
|
|
}
|
|
|
|
|
|
private PathParentIterator getPathParentIterator(String path) {
|
|
|
- if (watcherModeManager.getRecursiveQty() == 0) {
|
|
|
+ if (getRecursiveWatchQty() == 0) {
|
|
|
return PathParentIterator.forPathOnly(path);
|
|
|
}
|
|
|
return PathParentIterator.forAll(path);
|