|
@@ -2099,9 +2099,11 @@ static int send_set_watches(zhandle_t *zh)
|
|
|
int rc;
|
|
|
|
|
|
req.relativeZxid = zh->last_zxid;
|
|
|
+ lock_watchers(zh);
|
|
|
req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
|
|
|
req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
|
|
|
req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);
|
|
|
+ unlock_watchers(zh);
|
|
|
|
|
|
// return if there are no pending watches
|
|
|
if (!req.dataWatches.count && !req.existWatches.count &&
|
|
@@ -3055,7 +3057,9 @@ static int queue_session_event(zhandle_t *zh, int state)
|
|
|
}
|
|
|
/* We queued the buffer, so don't free it */
|
|
|
close_buffer_oarchive(&oa, 0);
|
|
|
+ lock_watchers(zh);
|
|
|
cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
|
|
|
+ unlock_watchers(zh);
|
|
|
queue_completion(&zh->completions_to_process, cptr, 0);
|
|
|
if (process_async(zh->outstanding_sync)) {
|
|
|
process_completions(zh);
|
|
@@ -3361,7 +3365,9 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
|
/* We are doing a notification, so there is no pending request */
|
|
|
c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
|
|
|
c->buffer = bptr;
|
|
|
+ lock_watchers(zh);
|
|
|
c->c.watcher_result = collectWatchers(zh, type, path);
|
|
|
+ unlock_watchers(zh);
|
|
|
|
|
|
// We cannot free until now, otherwise path will become invalid
|
|
|
deallocate_WatcherEvent(&evt);
|
|
@@ -3416,8 +3422,10 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
|
// Update last_zxid only when it is a request response
|
|
|
zh->last_zxid = hdr.zxid;
|
|
|
}
|
|
|
+ lock_watchers(zh);
|
|
|
activateWatcher(zh, cptr->watcher, rc);
|
|
|
deactivateWatcher(zh, cptr->watcher_deregistration, rc);
|
|
|
+ unlock_watchers(zh);
|
|
|
|
|
|
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
|
|
@@ -4708,19 +4716,23 @@ static int aremove_watches(
|
|
|
goto done;
|
|
|
}
|
|
|
|
|
|
+ lock_watchers(zh);
|
|
|
if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
|
|
|
rc = ZNOWATCHER;
|
|
|
+ unlock_watchers(zh);
|
|
|
goto done;
|
|
|
}
|
|
|
|
|
|
if (local) {
|
|
|
removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
|
|
|
+ unlock_watchers(zh);
|
|
|
#ifdef THREADED
|
|
|
notify_sync_completion((struct sync_completion *)data);
|
|
|
#endif
|
|
|
rc = ZOK;
|
|
|
goto done;
|
|
|
}
|
|
|
+ unlock_watchers(zh);
|
|
|
|
|
|
oa = create_buffer_oarchive();
|
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|