|
@@ -29,6 +29,8 @@
|
|
#include <proto.h>
|
|
#include <proto.h>
|
|
#include "zk_adaptor.h"
|
|
#include "zk_adaptor.h"
|
|
#include "zk_log.h"
|
|
#include "zk_log.h"
|
|
|
|
+#include "zk_hashtable.h"
|
|
|
|
+
|
|
#include <stdlib.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <string.h>
|
|
@@ -55,50 +57,50 @@ const int ZOOKEEPER_READ = 1 << 1;
|
|
const int EPHEMERAL = 1 << 0;
|
|
const int EPHEMERAL = 1 << 0;
|
|
const int SEQUENCE = 1 << 1;
|
|
const int SEQUENCE = 1 << 1;
|
|
|
|
|
|
-const int EXPIRED_SESSION_STATE = -112;
|
|
|
|
-const int AUTH_FAILED_STATE = -113;
|
|
|
|
-const int CONNECTING_STATE = 1;
|
|
|
|
-const int ASSOCIATING_STATE = 2;
|
|
|
|
-const int CONNECTED_STATE = 3;
|
|
|
|
|
|
+const int EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
|
|
|
|
+const int AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
|
|
|
|
+const int CONNECTING_STATE = CONNECTING_STATE_DEF;
|
|
|
|
+const int ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
|
|
|
|
+const int CONNECTED_STATE = CONNECTED_STATE_DEF;
|
|
static __attribute__ ((unused)) const char* state2String(int state){
|
|
static __attribute__ ((unused)) const char* state2String(int state){
|
|
switch(state){
|
|
switch(state){
|
|
case 0:
|
|
case 0:
|
|
return "CLOSED_STATE";
|
|
return "CLOSED_STATE";
|
|
- case 1 /*CONNECTING_STATE*/:
|
|
|
|
|
|
+ case CONNECTING_STATE_DEF:
|
|
return "CONNECTING_STATE";
|
|
return "CONNECTING_STATE";
|
|
- case 2 /*ASSOCIATING_STATE*/:
|
|
|
|
|
|
+ case ASSOCIATING_STATE_DEF:
|
|
return "ASSOCIATING_STATE";
|
|
return "ASSOCIATING_STATE";
|
|
- case 3 /*CONNECTED_STATE*/:
|
|
|
|
|
|
+ case CONNECTED_STATE_DEF:
|
|
return "CONNECTED_STATE";
|
|
return "CONNECTED_STATE";
|
|
- case -112 /*EXPIRED_SESSION_STATE*/:
|
|
|
|
|
|
+ case EXPIRED_SESSION_STATE_DEF:
|
|
return "EXPIRED_SESSION_STATE";
|
|
return "EXPIRED_SESSION_STATE";
|
|
- case -113 /*AUTH_FAILED_STATE*/:
|
|
|
|
|
|
+ case AUTH_FAILED_STATE_DEF:
|
|
return "AUTH_FAILED_STATE";
|
|
return "AUTH_FAILED_STATE";
|
|
}
|
|
}
|
|
return "INVALID_STATE";
|
|
return "INVALID_STATE";
|
|
}
|
|
}
|
|
|
|
|
|
-const int CREATED_EVENT = 1;
|
|
|
|
-const int DELETED_EVENT = 2;
|
|
|
|
-const int CHANGED_EVENT = 3;
|
|
|
|
-const int CHILD_EVENT = 4;
|
|
|
|
-const int SESSION_EVENT = -1;
|
|
|
|
-const int NOTWATCHING_EVENT = -2;
|
|
|
|
|
|
+const int CREATED_EVENT = CREATED_EVENT_DEF;
|
|
|
|
+const int DELETED_EVENT = DELETED_EVENT_DEF;
|
|
|
|
+const int CHANGED_EVENT = CHANGED_EVENT_DEF;
|
|
|
|
+const int CHILD_EVENT = CHILD_EVENT_DEF;
|
|
|
|
+const int SESSION_EVENT = SESSION_EVENT_DEF;
|
|
|
|
+const int NOTWATCHING_EVENT = NOTWATCHING_EVENT_DEF;
|
|
static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
|
|
static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
|
|
switch(ev){
|
|
switch(ev){
|
|
case 0:
|
|
case 0:
|
|
return "ERROR_EVENT";
|
|
return "ERROR_EVENT";
|
|
- case 1 /*CREATED_EVENT*/:
|
|
|
|
|
|
+ case CREATED_EVENT_DEF:
|
|
return "CREATED_EVENT";
|
|
return "CREATED_EVENT";
|
|
- case 2 /*DELETED_EVENT*/:
|
|
|
|
|
|
+ case DELETED_EVENT_DEF:
|
|
return "DELETED_EVENT";
|
|
return "DELETED_EVENT";
|
|
- case 3 /*CHANGED_EVENT*/:
|
|
|
|
|
|
+ case CHANGED_EVENT_DEF:
|
|
return "CHANGED_EVENT";
|
|
return "CHANGED_EVENT";
|
|
- case 4 /*CHILD_EVENT*/:
|
|
|
|
|
|
+ case CHILD_EVENT_DEF:
|
|
return "CHILD_EVENT";
|
|
return "CHILD_EVENT";
|
|
- case -1 /*SESSION_EVENT*/:
|
|
|
|
|
|
+ case SESSION_EVENT_DEF:
|
|
return "SESSION_EVENT";
|
|
return "SESSION_EVENT";
|
|
- case -2 /*NOTWATCHING_EVENT*/:
|
|
|
|
|
|
+ case NOTWATCHING_EVENT_DEF:
|
|
return "NOTWATCHING_EVENT";
|
|
return "NOTWATCHING_EVENT";
|
|
}
|
|
}
|
|
return "INVALID_EVENT";
|
|
return "INVALID_EVENT";
|
|
@@ -126,19 +128,6 @@ struct ACL_vector CREATOR_ALL_ACL = { 1, _CREATOR_ALL_ACL_ACL};
|
|
#define COMPLETION_ACLLIST 4
|
|
#define COMPLETION_ACLLIST 4
|
|
#define COMPLETION_STRING 5
|
|
#define COMPLETION_STRING 5
|
|
|
|
|
|
-const char*err2string(int err);
|
|
|
|
-static const char* format_endpoint_info(const struct sockaddr* ep);
|
|
|
|
-static const char* format_current_endpoint_info(zhandle_t* zh);
|
|
|
|
-static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
|
|
- const void *dc, const void *data, int add_to_front);
|
|
|
|
-static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
|
|
|
|
- const char* format,...);
|
|
|
|
-static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
|
|
|
|
-
|
|
|
|
-static int disable_conn_permute=0; // permute enabled by default
|
|
|
|
-
|
|
|
|
-static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
|
|
|
|
-
|
|
|
|
typedef struct _completion_list {
|
|
typedef struct _completion_list {
|
|
int xid;
|
|
int xid;
|
|
int completion_type; /* one of the COMPLETION_* values */
|
|
int completion_type; /* one of the COMPLETION_* values */
|
|
@@ -153,8 +142,30 @@ typedef struct _completion_list {
|
|
const void *data;
|
|
const void *data;
|
|
buffer_list_t *buffer;
|
|
buffer_list_t *buffer;
|
|
struct _completion_list *next;
|
|
struct _completion_list *next;
|
|
|
|
+ watcher_registration_t* watcher;
|
|
} completion_list_t;
|
|
} completion_list_t;
|
|
|
|
|
|
|
|
+const char*err2string(int err);
|
|
|
|
+static const char* format_endpoint_info(const struct sockaddr* ep);
|
|
|
|
+static const char* format_current_endpoint_info(zhandle_t* zh);
|
|
|
|
+
|
|
|
|
+/* completion routine forward declarations */
|
|
|
|
+static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
|
|
+ const void *dc, const void *data, int add_to_front,watcher_registration_t* wo);
|
|
|
|
+static completion_list_t* create_completion_entry(int xid, int completion_type,
|
|
|
|
+ const void *dc, const void *data,watcher_registration_t* wo);
|
|
|
|
+static void destroy_completion_entry(completion_list_t* c);
|
|
|
|
+static void queue_completion(completion_head_t *list, completion_list_t *c,
|
|
|
|
+ int add_to_front);
|
|
|
|
+
|
|
|
|
+static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
|
|
|
|
+ const char* format,...);
|
|
|
|
+static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
|
|
|
|
+
|
|
|
|
+static int disable_conn_permute=0; // permute enabled by default
|
|
|
|
+
|
|
|
|
+static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
|
|
|
|
+
|
|
const void *zoo_get_context(zhandle_t *zh)
|
|
const void *zoo_get_context(zhandle_t *zh)
|
|
{
|
|
{
|
|
return zh->context;
|
|
return zh->context;
|
|
@@ -194,6 +205,16 @@ int is_unrecoverable(zhandle_t *zh)
|
|
{
|
|
{
|
|
return (zh->state<0)? ZINVALIDSTATE: ZOK;
|
|
return (zh->state<0)? ZINVALIDSTATE: ZOK;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+int exists_result_checker(int rc)
|
|
|
|
+{
|
|
|
|
+ return rc==ZOK ||rc == ZNONODE;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int default_result_checker(int rc)
|
|
|
|
+{
|
|
|
|
+ return rc==ZOK;
|
|
|
|
+}
|
|
/**
|
|
/**
|
|
* Frees and closes everything associated with a handle,
|
|
* Frees and closes everything associated with a handle,
|
|
* including the handle itself.
|
|
* including the handle itself.
|
|
@@ -219,6 +240,8 @@ static void destroy(zhandle_t *zh)
|
|
zh->addrs = NULL;
|
|
zh->addrs = NULL;
|
|
}
|
|
}
|
|
free_auth_info(&zh->auth);
|
|
free_auth_info(&zh->auth);
|
|
|
|
+ destroy_zk_hashtable(zh->active_node_watchers);
|
|
|
|
+ destroy_zk_hashtable(zh->active_child_watchers);
|
|
}
|
|
}
|
|
|
|
|
|
static void setup_random()
|
|
static void setup_random()
|
|
@@ -359,7 +382,7 @@ const clientid_t *zoo_client_id(zhandle_t *zh)
|
|
return &zh->client_id;
|
|
return &zh->client_id;
|
|
}
|
|
}
|
|
|
|
|
|
-static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4){}
|
|
|
|
|
|
+static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4,void*p5){}
|
|
|
|
|
|
watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
|
|
watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
|
|
{
|
|
{
|
|
@@ -412,9 +435,13 @@ zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
|
|
zh->last_zxid = 0;
|
|
zh->last_zxid = 0;
|
|
zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
|
|
zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
|
|
zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
|
|
zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
|
|
|
|
+ zh->active_node_watchers=create_zk_hashtable();
|
|
|
|
+ zh->active_child_watchers=create_zk_hashtable();
|
|
|
|
+
|
|
if (adaptor_init(zh) == -1) {
|
|
if (adaptor_init(zh) == -1) {
|
|
goto abort;
|
|
goto abort;
|
|
}
|
|
}
|
|
|
|
+
|
|
return zh;
|
|
return zh;
|
|
abort:
|
|
abort:
|
|
errnosave=errno;
|
|
errnosave=errno;
|
|
@@ -675,7 +702,7 @@ void free_completions(zhandle_t *zh,int callCompletion,int rc)
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- free(cptr);
|
|
|
|
|
|
+ destroy_completion_entry(cptr);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1116,11 +1143,6 @@ static __attribute__((unused)) void print_completion_queue(zhandle_t *zh)
|
|
fprintf(LOGSTREAM,"end\n");
|
|
fprintf(LOGSTREAM,"end\n");
|
|
}
|
|
}
|
|
|
|
|
|
-static completion_list_t* create_completion_entry(int xid, int completion_type,
|
|
|
|
- const void *dc, const void *data);
|
|
|
|
-static void queue_completion(completion_head_t *list, completion_list_t *c,
|
|
|
|
- int add_to_front);
|
|
|
|
-
|
|
|
|
#ifdef THREADED
|
|
#ifdef THREADED
|
|
// IO thread queues session events to be processed by the completion thread
|
|
// IO thread queues session events to be processed by the completion thread
|
|
int queue_session_event(zhandle_t *zh, int state)
|
|
int queue_session_event(zhandle_t *zh, int state)
|
|
@@ -1141,12 +1163,7 @@ int queue_session_event(zhandle_t *zh, int state)
|
|
close_buffer_oarchive(&oa, 1);
|
|
close_buffer_oarchive(&oa, 1);
|
|
goto error;
|
|
goto error;
|
|
}
|
|
}
|
|
- if ((cptr=calloc(1,sizeof(*cptr)))==NULL) {
|
|
|
|
- LOG_ERROR(("out of memory"));
|
|
|
|
- close_buffer_oarchive(&oa, 1);
|
|
|
|
- goto error;
|
|
|
|
- }
|
|
|
|
- cptr->xid = WATCHER_EVENT_XID;
|
|
|
|
|
|
+ cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0);
|
|
cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
|
|
cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
|
|
cptr->buffer->curr_offset = get_buffer_len(oa);
|
|
cptr->buffer->curr_offset = get_buffer_len(oa);
|
|
if (!cptr->buffer) {
|
|
if (!cptr->buffer) {
|
|
@@ -1180,6 +1197,8 @@ completion_list_t *dequeue_completion(completion_head_t *list)
|
|
return cptr;
|
|
return cptr;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+/* handles async completion (both single- and multithreaded) */
|
|
void process_completions(zhandle_t *zh)
|
|
void process_completions(zhandle_t *zh)
|
|
{
|
|
{
|
|
completion_list_t *cptr;
|
|
completion_list_t *cptr;
|
|
@@ -1201,7 +1220,7 @@ void process_completions(zhandle_t *zh)
|
|
/* This is a notification so there aren't any pending requests */
|
|
/* This is a notification so there aren't any pending requests */
|
|
LOG_DEBUG(("Calling a watcher for node [%s], event=%s",
|
|
LOG_DEBUG(("Calling a watcher for node [%s], event=%s",
|
|
(evt.path==NULL?"NULL":evt.path),watcherEvent2String(type)));
|
|
(evt.path==NULL?"NULL":evt.path),watcherEvent2String(type)));
|
|
- zh->watcher(zh, type, state, evt.path);
|
|
|
|
|
|
+ deliverWatchers(zh,type,state,evt.path);
|
|
deallocate_WatcherEvent(&evt);
|
|
deallocate_WatcherEvent(&evt);
|
|
} else {
|
|
} else {
|
|
int rc = hdr.err;
|
|
int rc = hdr.err;
|
|
@@ -1271,9 +1290,9 @@ void process_completions(zhandle_t *zh)
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
- free_buffer(cptr->buffer);
|
|
|
|
- free(cptr);
|
|
|
|
|
|
+ activateWatcher(cptr->watcher,rc);
|
|
}
|
|
}
|
|
|
|
+ destroy_completion_entry(cptr);
|
|
close_buffer_iarchive(&ia);
|
|
close_buffer_iarchive(&ia);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1331,7 +1350,7 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
zh->last_zxid = hdr.zxid;
|
|
zh->last_zxid = hdr.zxid;
|
|
|
|
|
|
if (hdr.xid == WATCHER_EVENT_XID) {
|
|
if (hdr.xid == WATCHER_EVENT_XID) {
|
|
- completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0);
|
|
|
|
|
|
+ completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0);
|
|
c->buffer = bptr;
|
|
c->buffer = bptr;
|
|
queue_completion(&zh->completions_to_process, c, 0);
|
|
queue_completion(&zh->completions_to_process, c, 0);
|
|
} else if(hdr.xid == AUTH_XID){
|
|
} else if(hdr.xid == AUTH_XID){
|
|
@@ -1378,10 +1397,10 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
sc->rc = rc;
|
|
sc->rc = rc;
|
|
switch(cptr->completion_type) {
|
|
switch(cptr->completion_type) {
|
|
case COMPLETION_DATA:
|
|
case COMPLETION_DATA:
|
|
|
|
+ LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
|
|
if (rc==0) {
|
|
if (rc==0) {
|
|
struct GetDataResponse res;
|
|
struct GetDataResponse res;
|
|
int len;
|
|
int len;
|
|
- LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
|
|
|
|
deserialize_GetDataResponse(ia, "reply", &res);
|
|
deserialize_GetDataResponse(ia, "reply", &res);
|
|
if (res.data.len <= sc->u.data.buff_len) {
|
|
if (res.data.len <= sc->u.data.buff_len) {
|
|
len = res.data.len;
|
|
len = res.data.len;
|
|
@@ -1395,18 +1414,18 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case COMPLETION_STAT:
|
|
case COMPLETION_STAT:
|
|
|
|
+ LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
|
|
if (rc == 0) {
|
|
if (rc == 0) {
|
|
struct SetDataResponse res;
|
|
struct SetDataResponse res;
|
|
- LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
|
|
|
|
deserialize_SetDataResponse(ia, "reply", &res);
|
|
deserialize_SetDataResponse(ia, "reply", &res);
|
|
sc->u.stat = res.stat;
|
|
sc->u.stat = res.stat;
|
|
deallocate_SetDataResponse(&res);
|
|
deallocate_SetDataResponse(&res);
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case COMPLETION_STRINGLIST:
|
|
case COMPLETION_STRINGLIST:
|
|
|
|
+ LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
|
|
if (rc == 0) {
|
|
if (rc == 0) {
|
|
struct GetChildrenResponse res;
|
|
struct GetChildrenResponse res;
|
|
- LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
|
|
|
|
deserialize_GetChildrenResponse(ia, "reply", &res);
|
|
deserialize_GetChildrenResponse(ia, "reply", &res);
|
|
sc->u.strs = res.children;
|
|
sc->u.strs = res.children;
|
|
/* We don't deallocate since we are passing it back */
|
|
/* We don't deallocate since we are passing it back */
|
|
@@ -1414,10 +1433,10 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case COMPLETION_STRING:
|
|
case COMPLETION_STRING:
|
|
|
|
+ LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
|
|
if (rc == 0) {
|
|
if (rc == 0) {
|
|
struct CreateResponse res;
|
|
struct CreateResponse res;
|
|
int len;
|
|
int len;
|
|
- LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
|
|
|
|
deserialize_CreateResponse(ia, "reply", &res);
|
|
deserialize_CreateResponse(ia, "reply", &res);
|
|
if (sc->u.str.str_len > strlen(res.path)) {
|
|
if (sc->u.str.str_len > strlen(res.path)) {
|
|
len = strlen(res.path);
|
|
len = strlen(res.path);
|
|
@@ -1430,9 +1449,9 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
case COMPLETION_ACLLIST:
|
|
case COMPLETION_ACLLIST:
|
|
|
|
+ LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
|
|
if (rc == 0) {
|
|
if (rc == 0) {
|
|
struct GetACLResponse res;
|
|
struct GetACLResponse res;
|
|
- LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
|
|
|
|
deserialize_GetACLResponse(ia, "reply", &res);
|
|
deserialize_GetACLResponse(ia, "reply", &res);
|
|
cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
|
|
cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
|
|
sc->u.acl.acl = res.acl;
|
|
sc->u.acl.acl = res.acl;
|
|
@@ -1445,6 +1464,7 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc));
|
|
LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc));
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
|
|
+ activateWatcher(cptr->watcher,rc);
|
|
notify_sync_completion(sc);
|
|
notify_sync_completion(sc);
|
|
free_buffer(bptr);
|
|
free_buffer(bptr);
|
|
zh->outstanding_sync--;
|
|
zh->outstanding_sync--;
|
|
@@ -1468,8 +1488,30 @@ int zoo_state(zhandle_t *zh)
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+static watcher_registration_t* create_watcher_registration(const char* path,
|
|
|
|
+ result_checker_fn checker,watcher_fn watcher,void* ctx,
|
|
|
|
+ zk_hashtable* activeMap){
|
|
|
|
+ watcher_registration_t* wo;
|
|
|
|
+ if(watcher==0)
|
|
|
|
+ return 0;
|
|
|
|
+ wo=calloc(1,sizeof(watcher_registration_t));
|
|
|
|
+ wo->path=strdup(path);
|
|
|
|
+ wo->watcher=watcher;
|
|
|
|
+ wo->context=ctx;
|
|
|
|
+ wo->checker=checker==0?default_result_checker:checker;
|
|
|
|
+ wo->activeMap=activeMap;
|
|
|
|
+ return wo;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+static void destroy_watcher_registration(watcher_registration_t* wo){
|
|
|
|
+ if(wo!=0){
|
|
|
|
+ free((void*)wo->path);
|
|
|
|
+ free(wo);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
static completion_list_t* create_completion_entry(int xid, int completion_type,
|
|
static completion_list_t* create_completion_entry(int xid, int completion_type,
|
|
- const void *dc, const void *data)
|
|
|
|
|
|
+ const void *dc, const void *data,watcher_registration_t* wo)
|
|
{
|
|
{
|
|
completion_list_t *c = calloc(1,sizeof(completion_list_t));
|
|
completion_list_t *c = calloc(1,sizeof(completion_list_t));
|
|
if (!c) {
|
|
if (!c) {
|
|
@@ -1499,15 +1541,24 @@ static completion_list_t* create_completion_entry(int xid, int completion_type,
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
c->xid = xid;
|
|
c->xid = xid;
|
|
- c->next = 0;
|
|
|
|
|
|
+ c->watcher = wo;
|
|
|
|
|
|
return c;
|
|
return c;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+static void destroy_completion_entry(completion_list_t* c){
|
|
|
|
+ if(c!=0){
|
|
|
|
+ if(c->buffer!=0)
|
|
|
|
+ free_buffer(c->buffer);
|
|
|
|
+ destroy_watcher_registration(c->watcher);
|
|
|
|
+ free(c);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
static void queue_completion(completion_head_t *list, completion_list_t *c,
|
|
static void queue_completion(completion_head_t *list, completion_list_t *c,
|
|
int add_to_front)
|
|
int add_to_front)
|
|
{
|
|
{
|
|
- c->next = 0;
|
|
|
|
|
|
+ c->next = 0;
|
|
/* appending a new entry to the back of the list */
|
|
/* appending a new entry to the back of the list */
|
|
lock_completion_list(list);
|
|
lock_completion_list(list);
|
|
if (list->last) {
|
|
if (list->last) {
|
|
@@ -1530,10 +1581,11 @@ static void queue_completion(completion_head_t *list, completion_list_t *c,
|
|
}
|
|
}
|
|
|
|
|
|
static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
- const void *dc, const void *data, int add_to_front)
|
|
|
|
|
|
+ const void *dc, const void *data, int add_to_front,
|
|
|
|
+ watcher_registration_t* wo)
|
|
{
|
|
{
|
|
completion_list_t *c =create_completion_entry(xid, completion_type, dc,
|
|
completion_list_t *c =create_completion_entry(xid, completion_type, dc,
|
|
- data);
|
|
|
|
|
|
+ data,wo);
|
|
if (!c)
|
|
if (!c)
|
|
return ZSYSTEMERROR;
|
|
return ZSYSTEMERROR;
|
|
queue_completion(&zh->sent_requests, c, add_to_front);
|
|
queue_completion(&zh->sent_requests, c, add_to_front);
|
|
@@ -1544,39 +1596,39 @@ static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
}
|
|
}
|
|
|
|
|
|
static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
|
|
static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
|
|
- const void *data)
|
|
|
|
|
|
+ const void *data,watcher_registration_t* wo)
|
|
{
|
|
{
|
|
- return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0);
|
|
|
|
|
|
+ return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0,wo);
|
|
}
|
|
}
|
|
|
|
|
|
static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
|
|
static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
|
|
- const void *data)
|
|
|
|
|
|
+ const void *data,watcher_registration_t* wo)
|
|
{
|
|
{
|
|
- return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0);
|
|
|
|
|
|
+ return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0,wo);
|
|
}
|
|
}
|
|
|
|
|
|
static int add_strings_completion(zhandle_t *zh, int xid,
|
|
static int add_strings_completion(zhandle_t *zh, int xid,
|
|
- strings_completion_t dc, const void *data)
|
|
|
|
|
|
+ strings_completion_t dc, const void *data,watcher_registration_t* wo)
|
|
{
|
|
{
|
|
- return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0);
|
|
|
|
|
|
+ return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0,wo);
|
|
}
|
|
}
|
|
|
|
|
|
static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
|
|
static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
|
|
const void *data)
|
|
const void *data)
|
|
{
|
|
{
|
|
- return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0);
|
|
|
|
|
|
+ return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0,0);
|
|
}
|
|
}
|
|
|
|
|
|
static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
|
|
static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
|
|
const void *data)
|
|
const void *data)
|
|
{
|
|
{
|
|
- return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0);
|
|
|
|
|
|
+ return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0,0);
|
|
}
|
|
}
|
|
|
|
|
|
static int add_string_completion(zhandle_t *zh, int xid,
|
|
static int add_string_completion(zhandle_t *zh, int xid,
|
|
string_completion_t dc, const void *data)
|
|
string_completion_t dc, const void *data)
|
|
{
|
|
{
|
|
- return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0);
|
|
|
|
|
|
+ return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0,0);
|
|
}
|
|
}
|
|
|
|
|
|
int zookeeper_close(zhandle_t *zh)
|
|
int zookeeper_close(zhandle_t *zh)
|
|
@@ -1624,10 +1676,17 @@ finish:
|
|
|
|
|
|
int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
|
|
int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
|
|
const void *data)
|
|
const void *data)
|
|
|
|
+{
|
|
|
|
+ return zoo_awget(zh,path,watch?zh->watcher:0,zh->context,dc,data);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_awget(zhandle_t *zh, const char *path,
|
|
|
|
+ watcher_fn watcher, void* watcherCtx,
|
|
|
|
+ data_completion_t dc, const void *data)
|
|
{
|
|
{
|
|
struct oarchive *oa;
|
|
struct oarchive *oa;
|
|
struct RequestHeader h = { .xid = get_xid(), .type = GETDATA_OP};
|
|
struct RequestHeader h = { .xid = get_xid(), .type = GETDATA_OP};
|
|
- struct GetDataRequest req = { (char*)path, watch };
|
|
|
|
|
|
+ struct GetDataRequest req = { (char*)path, watcher!=0 };
|
|
int rc;
|
|
int rc;
|
|
|
|
|
|
if (zh==0 || path==0)
|
|
if (zh==0 || path==0)
|
|
@@ -1638,7 +1697,9 @@ int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data);
|
|
|
|
|
|
+ rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
|
|
|
|
+ create_watcher_registration(path,0,watcher,watcherCtx,
|
|
|
|
+ zh->active_node_watchers));
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
get_buffer_len(oa));
|
|
get_buffer_len(oa));
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
@@ -1673,7 +1734,7 @@ int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data);
|
|
|
|
|
|
+ rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data,0);
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
get_buffer_len(oa));
|
|
get_buffer_len(oa));
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
@@ -1761,11 +1822,18 @@ int zoo_adelete(zhandle_t *zh, const char *path, int version,
|
|
}
|
|
}
|
|
|
|
|
|
int zoo_aexists(zhandle_t *zh, const char *path, int watch,
|
|
int zoo_aexists(zhandle_t *zh, const char *path, int watch,
|
|
|
|
+ stat_completion_t sc, const void *data)
|
|
|
|
+{
|
|
|
|
+ return zoo_awexists(zh,path,watch?zh->watcher:0,zh->context,sc,data);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_awexists(zhandle_t *zh, const char *path,
|
|
|
|
+ watcher_fn watcher, void* watcherCtx,
|
|
stat_completion_t completion, const void *data)
|
|
stat_completion_t completion, const void *data)
|
|
{
|
|
{
|
|
struct oarchive *oa;
|
|
struct oarchive *oa;
|
|
struct RequestHeader h = { .xid = get_xid(), .type = EXISTS_OP };
|
|
struct RequestHeader h = { .xid = get_xid(), .type = EXISTS_OP };
|
|
- struct ExistsRequest req;
|
|
|
|
|
|
+ struct ExistsRequest req = {(char*)path, watcher!=0 };
|
|
int rc;
|
|
int rc;
|
|
|
|
|
|
if (zh==0 || path==0)
|
|
if (zh==0 || path==0)
|
|
@@ -1773,12 +1841,12 @@ int zoo_aexists(zhandle_t *zh, const char *path, int watch,
|
|
if (is_unrecoverable(zh))
|
|
if (is_unrecoverable(zh))
|
|
return ZINVALIDSTATE;
|
|
return ZINVALIDSTATE;
|
|
oa = create_buffer_oarchive();
|
|
oa = create_buffer_oarchive();
|
|
- req.path = (char*)path;
|
|
|
|
- req.watch = watch;
|
|
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data);
|
|
|
|
|
|
+ rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data,
|
|
|
|
+ create_watcher_registration(path,exists_result_checker,
|
|
|
|
+ watcher,watcherCtx,zh->active_node_watchers));
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
get_buffer_len(oa));
|
|
get_buffer_len(oa));
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
@@ -1793,11 +1861,18 @@ int zoo_aexists(zhandle_t *zh, const char *path, int watch,
|
|
}
|
|
}
|
|
|
|
|
|
int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
|
|
int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
|
|
- strings_completion_t completion, const void *data)
|
|
|
|
|
|
+ strings_completion_t dc, const void *data)
|
|
|
|
+{
|
|
|
|
+ return zoo_awget_children(zh,path,watch?zh->watcher:0,zh->context,dc,data);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_awget_children(zhandle_t *zh, const char *path,
|
|
|
|
+ watcher_fn watcher, void* watcherCtx,
|
|
|
|
+ strings_completion_t dc, const void *data)
|
|
{
|
|
{
|
|
struct oarchive *oa;
|
|
struct oarchive *oa;
|
|
struct RequestHeader h = { .xid = get_xid(), .type = GETCHILDREN_OP};
|
|
struct RequestHeader h = { .xid = get_xid(), .type = GETCHILDREN_OP};
|
|
- struct GetChildrenRequest req;
|
|
|
|
|
|
+ struct GetChildrenRequest req={(char*)path, watcher!=0 };
|
|
int rc;
|
|
int rc;
|
|
|
|
|
|
if (zh==0 || path==0)
|
|
if (zh==0 || path==0)
|
|
@@ -1805,12 +1880,12 @@ int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
|
|
if (is_unrecoverable(zh))
|
|
if (is_unrecoverable(zh))
|
|
return ZINVALIDSTATE;
|
|
return ZINVALIDSTATE;
|
|
oa = create_buffer_oarchive();
|
|
oa = create_buffer_oarchive();
|
|
- req.path = (char*)path;
|
|
|
|
- req.watch = watch;
|
|
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, completion, data);
|
|
|
|
|
|
+ rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, dc, data,
|
|
|
|
+ create_watcher_registration(path,0,watcher,watcherCtx,
|
|
|
|
+ zh->active_child_watchers));
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
get_buffer_len(oa));
|
|
get_buffer_len(oa));
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
@@ -2130,13 +2205,19 @@ int zoo_delete(zhandle_t *zh, const char *path, int version)
|
|
}
|
|
}
|
|
|
|
|
|
int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
|
|
int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
|
|
|
|
+{
|
|
|
|
+ return zoo_wexists(zh,path,watch?zh->watcher:0,zh->context,stat);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_wexists(zhandle_t *zh, const char *path,
|
|
|
|
+ watcher_fn watcher, void* watcherCtx, struct Stat *stat)
|
|
{
|
|
{
|
|
struct sync_completion *sc = alloc_sync_completion();
|
|
struct sync_completion *sc = alloc_sync_completion();
|
|
int rc;
|
|
int rc;
|
|
if (!sc) {
|
|
if (!sc) {
|
|
return ZSYSTEMERROR;
|
|
return ZSYSTEMERROR;
|
|
}
|
|
}
|
|
- rc=zoo_aexists(zh, path, watch, SYNCHRONOUS_MARKER, sc);
|
|
|
|
|
|
+ rc=zoo_awexists(zh,path,watcher,watcherCtx,SYNCHRONOUS_MARKER, sc);
|
|
if(rc==ZOK){
|
|
if(rc==ZOK){
|
|
wait_sync_completion(sc);
|
|
wait_sync_completion(sc);
|
|
rc = sc->rc;
|
|
rc = sc->rc;
|
|
@@ -2145,11 +2226,19 @@ int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
free_sync_completion(sc);
|
|
free_sync_completion(sc);
|
|
- return rc;
|
|
|
|
|
|
+ return rc;
|
|
}
|
|
}
|
|
|
|
|
|
int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
|
|
int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
|
|
int* buffer_len, struct Stat *stat)
|
|
int* buffer_len, struct Stat *stat)
|
|
|
|
+{
|
|
|
|
+ return zoo_wget(zh,path,watch?zh->watcher:0,zh->context,
|
|
|
|
+ buffer,buffer_len,stat);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_wget(zhandle_t *zh, const char *path,
|
|
|
|
+ watcher_fn watcher, void* watcherCtx,
|
|
|
|
+ char *buffer, int* buffer_len, struct Stat *stat)
|
|
{
|
|
{
|
|
struct sync_completion *sc;
|
|
struct sync_completion *sc;
|
|
int rc=0;
|
|
int rc=0;
|
|
@@ -2161,7 +2250,7 @@ int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
|
|
|
|
|
|
sc->u.data.buffer = buffer;
|
|
sc->u.data.buffer = buffer;
|
|
sc->u.data.buff_len = *buffer_len;
|
|
sc->u.data.buff_len = *buffer_len;
|
|
- rc=zoo_aget(zh, path, watch, SYNCHRONOUS_MARKER, sc);
|
|
|
|
|
|
+ rc=zoo_awget(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
|
|
if(rc==ZOK){
|
|
if(rc==ZOK){
|
|
wait_sync_completion(sc);
|
|
wait_sync_completion(sc);
|
|
rc = sc->rc;
|
|
rc = sc->rc;
|
|
@@ -2194,13 +2283,20 @@ int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
|
|
|
|
|
|
int zoo_get_children(zhandle_t *zh, const char *path, int watch,
|
|
int zoo_get_children(zhandle_t *zh, const char *path, int watch,
|
|
struct String_vector *strings)
|
|
struct String_vector *strings)
|
|
|
|
+{
|
|
|
|
+ return zoo_wget_children(zh,path,watch?zh->watcher:0,zh->context,strings);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_wget_children(zhandle_t *zh, const char *path,
|
|
|
|
+ watcher_fn watcher, void* watcherCtx,
|
|
|
|
+ struct String_vector *strings)
|
|
{
|
|
{
|
|
struct sync_completion *sc = alloc_sync_completion();
|
|
struct sync_completion *sc = alloc_sync_completion();
|
|
int rc;
|
|
int rc;
|
|
if (!sc) {
|
|
if (!sc) {
|
|
return ZSYSTEMERROR;
|
|
return ZSYSTEMERROR;
|
|
}
|
|
}
|
|
- rc=zoo_aget_children(zh, path, watch, SYNCHRONOUS_MARKER, sc);
|
|
|
|
|
|
+ rc=zoo_awget_children(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
|
|
if(rc==ZOK){
|
|
if(rc==ZOK){
|
|
wait_sync_completion(sc);
|
|
wait_sync_completion(sc);
|
|
rc = sc->rc;
|
|
rc = sc->rc;
|