|
@@ -177,6 +177,7 @@ typedef struct _completion_list {
|
|
|
buffer_list_t *buffer;
|
|
|
struct _completion_list *next;
|
|
|
watcher_registration_t* watcher;
|
|
|
+ watcher_deregistration_t* watcher_deregistration;
|
|
|
} completion_list_t;
|
|
|
|
|
|
const char*err2string(int err);
|
|
@@ -191,9 +192,23 @@ static int deserialize_multi(zhandle_t *zh, int xid, completion_list_t *cptr, st
|
|
|
static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
|
const void *dc, const void *data, int add_to_front,
|
|
|
watcher_registration_t* wo, completion_head_t *clist);
|
|
|
+static int add_completion_deregistration(zhandle_t *zh, int xid,
|
|
|
+ int completion_type, const void *dc, const void *data,
|
|
|
+ int add_to_front, watcher_deregistration_t* wo,
|
|
|
+ completion_head_t *clist);
|
|
|
+static int do_add_completion(zhandle_t *zh, const void *dc, completion_list_t *c,
|
|
|
+ int add_to_front);
|
|
|
+
|
|
|
static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
|
|
|
const void *dc, const void *data, watcher_registration_t* wo,
|
|
|
completion_head_t *clist);
|
|
|
+static completion_list_t* create_completion_entry_deregistration(zhandle_t *zh,
|
|
|
+ int xid, int completion_type, const void *dc, const void *data,
|
|
|
+ watcher_deregistration_t* wo, completion_head_t *clist);
|
|
|
+static completion_list_t* do_create_completion_entry(zhandle_t *zh,
|
|
|
+ int xid, int completion_type, const void *dc, const void *data,
|
|
|
+ watcher_registration_t* wo, completion_head_t *clist,
|
|
|
+ watcher_deregistration_t* wdo);
|
|
|
static void destroy_completion_entry(completion_list_t* c);
|
|
|
static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
|
|
|
int add_to_front);
|
|
@@ -2670,6 +2685,7 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
|
zh->last_zxid = hdr.zxid;
|
|
|
}
|
|
|
activateWatcher(zh, cptr->watcher, rc);
|
|
|
+ deactivateWatcher(zh, cptr->watcher_deregistration, rc);
|
|
|
|
|
|
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
|
|
|
if(hdr.xid == PING_XID){
|
|
@@ -2730,6 +2746,21 @@ static watcher_registration_t* create_watcher_registration(const char* path,
|
|
|
return wo;
|
|
|
}
|
|
|
|
|
|
+static watcher_deregistration_t* create_watcher_deregistration(const char* path,
|
|
|
+ watcher_fn watcher, void *watcherCtx, ZooWatcherType wtype) {
|
|
|
+ watcher_deregistration_t *wdo;
|
|
|
+
|
|
|
+ wdo = calloc(1, sizeof(watcher_deregistration_t));
|
|
|
+ if (!wdo) {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ wdo->path = strdup(path);
|
|
|
+ wdo->watcher = watcher;
|
|
|
+ wdo->context = watcherCtx;
|
|
|
+ wdo->type = wtype;
|
|
|
+ return wdo;
|
|
|
+}
|
|
|
+
|
|
|
static void destroy_watcher_registration(watcher_registration_t* wo){
|
|
|
if(wo!=0){
|
|
|
free((void*)wo->path);
|
|
@@ -2737,10 +2768,34 @@ static void destroy_watcher_registration(watcher_registration_t* wo){
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+static void destroy_watcher_deregistration(watcher_deregistration_t *wdo) {
|
|
|
+ if (wdo) {
|
|
|
+ free((void *)wdo->path);
|
|
|
+ free(wdo);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int completion_type,
|
|
|
const void *dc, const void *data,watcher_registration_t* wo, completion_head_t *clist)
|
|
|
{
|
|
|
- completion_list_t *c = calloc(1,sizeof(completion_list_t));
|
|
|
+ return do_create_completion_entry(zh, xid, completion_type, dc, data, wo,
|
|
|
+ clist, NULL);
|
|
|
+}
|
|
|
+
|
|
|
+static completion_list_t* create_completion_entry_deregistration(zhandle_t *zh,
|
|
|
+ int xid, int completion_type, const void *dc, const void *data,
|
|
|
+ watcher_deregistration_t* wdo, completion_head_t *clist)
|
|
|
+{
|
|
|
+ return do_create_completion_entry(zh, xid, completion_type, dc, data, NULL,
|
|
|
+ clist, wdo);
|
|
|
+}
|
|
|
+
|
|
|
+static completion_list_t* do_create_completion_entry(zhandle_t *zh, int xid,
|
|
|
+ int completion_type, const void *dc, const void *data,
|
|
|
+ watcher_registration_t* wo, completion_head_t *clist,
|
|
|
+ watcher_deregistration_t* wdo)
|
|
|
+{
|
|
|
+ completion_list_t *c = calloc(1, sizeof(completion_list_t));
|
|
|
if (!c) {
|
|
|
LOG_ERROR(LOGCALLBACK(zh), "out of memory");
|
|
|
return 0;
|
|
@@ -2779,6 +2834,7 @@ static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int co
|
|
|
}
|
|
|
c->xid = xid;
|
|
|
c->watcher = wo;
|
|
|
+ c->watcher_deregistration = wdo;
|
|
|
|
|
|
return c;
|
|
|
}
|
|
@@ -2786,6 +2842,7 @@ static completion_list_t* create_completion_entry(zhandle_t *zh, int xid, int co
|
|
|
static void destroy_completion_entry(completion_list_t* c){
|
|
|
if(c!=0){
|
|
|
destroy_watcher_registration(c->watcher);
|
|
|
+ destroy_watcher_deregistration(c->watcher_deregistration);
|
|
|
if(c->buffer!=0)
|
|
|
free_buffer(c->buffer);
|
|
|
free(c);
|
|
@@ -2831,6 +2888,21 @@ static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
|
{
|
|
|
completion_list_t *c =create_completion_entry(zh, xid, completion_type, dc,
|
|
|
data, wo, clist);
|
|
|
+ return do_add_completion(zh, dc, c, add_to_front);
|
|
|
+}
|
|
|
+
|
|
|
+static int add_completion_deregistration(zhandle_t *zh, int xid,
|
|
|
+ int completion_type, const void *dc, const void *data, int add_to_front,
|
|
|
+ watcher_deregistration_t* wdo, completion_head_t *clist)
|
|
|
+{
|
|
|
+ completion_list_t *c = create_completion_entry_deregistration(zh, xid,
|
|
|
+ completion_type, dc, data, wdo, clist);
|
|
|
+ return do_add_completion(zh, dc, c, add_to_front);
|
|
|
+}
|
|
|
+
|
|
|
+static int do_add_completion(zhandle_t *zh, const void *dc,
|
|
|
+ completion_list_t *c, int add_to_front)
|
|
|
+{
|
|
|
int rc = 0;
|
|
|
if (!c)
|
|
|
return ZSYSTEMERROR;
|
|
@@ -4374,3 +4446,94 @@ int zoo_set_acl(zhandle_t *zh, const char *path, int version,
|
|
|
free_sync_completion(sc);
|
|
|
return rc;
|
|
|
}
|
|
|
+
|
|
|
+int zoo_remove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
|
|
|
+ watcher_fn watcher, void *watcherCtx, int local)
|
|
|
+{
|
|
|
+ struct sync_completion *sc;
|
|
|
+ int rc = 0;
|
|
|
+
|
|
|
+ if (!path)
|
|
|
+ return ZBADARGUMENTS;
|
|
|
+
|
|
|
+ sc = alloc_sync_completion();
|
|
|
+ if (!sc)
|
|
|
+ return ZSYSTEMERROR;
|
|
|
+
|
|
|
+ rc = zoo_aremove_watchers(zh, path, wtype, watcher, watcherCtx, local,
|
|
|
+ SYNCHRONOUS_MARKER, sc);
|
|
|
+ if (rc == ZOK) {
|
|
|
+ wait_sync_completion(sc);
|
|
|
+ rc = sc->rc;
|
|
|
+ }
|
|
|
+ free_sync_completion(sc);
|
|
|
+ return rc;
|
|
|
+}
|
|
|
+
|
|
|
+int zoo_aremove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
|
|
|
+ watcher_fn watcher, void *watcherCtx, int local,
|
|
|
+ void_completion_t *completion, const void *data)
|
|
|
+{
|
|
|
+ char *server_path = prepend_string(zh, path);
|
|
|
+ int rc;
|
|
|
+ struct oarchive *oa;
|
|
|
+ struct RequestHeader h = { get_xid(), ZOO_REMOVE_WATCHES };
|
|
|
+ struct RemoveWatchesRequest req = { (char*)server_path, wtype };
|
|
|
+ watcher_deregistration_t *wdo;
|
|
|
+
|
|
|
+ if (!zh || !isValidPath(server_path, 0)) {
|
|
|
+ rc = ZBADARGUMENTS;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!local && is_unrecoverable(zh)) {
|
|
|
+ rc = ZINVALIDSTATE;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
|
|
|
+ rc = ZNOWATCHER;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (local) {
|
|
|
+ removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
|
|
|
+ notify_sync_completion((struct sync_completion *)data);
|
|
|
+ rc = ZOK;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ oa = create_buffer_oarchive();
|
|
|
+ rc = serialize_RequestHeader(oa, "header", &h);
|
|
|
+ rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req);
|
|
|
+ if (rc < 0) {
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ wdo = create_watcher_deregistration(server_path, watcher, watcherCtx,
|
|
|
+ wtype);
|
|
|
+ if (!wdo) {
|
|
|
+ rc = ZSYSTEMERROR;
|
|
|
+ goto done;
|
|
|
+ }
|
|
|
+
|
|
|
+ enter_critical(zh);
|
|
|
+ rc = add_completion_deregistration(zh, h.xid, COMPLETION_VOID,
|
|
|
+ completion, data, 0, wdo, 0);
|
|
|
+ rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
|
+ get_buffer_len(oa));
|
|
|
+ rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
|
|
|
+ leave_critical(zh);
|
|
|
+
|
|
|
+ /* We queued the buffer, so don't free it */
|
|
|
+ close_buffer_oarchive(&oa, 0);
|
|
|
+
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
|
|
|
+ h.xid, path, zoo_get_current_server(zh));
|
|
|
+
|
|
|
+ adaptor_send_queue(zh, 0);
|
|
|
+
|
|
|
+done:
|
|
|
+ free_duplicate_path(server_path, path);
|
|
|
+ return rc;
|
|
|
+}
|