|
@@ -24,25 +24,28 @@
|
|
|
#include <stdlib.h>
|
|
|
#include <assert.h>
|
|
|
|
|
|
-#ifdef THREADED
|
|
|
-#include <pthread.h>
|
|
|
-#endif
|
|
|
+typedef struct _watcher_object {
|
|
|
+ watcher_fn watcher;
|
|
|
+ void* context;
|
|
|
+ struct _watcher_object* next;
|
|
|
+} watcher_object_t;
|
|
|
+
|
|
|
|
|
|
struct _zk_hashtable {
|
|
|
struct hashtable* ht;
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_t lock;
|
|
|
-#endif
|
|
|
};
|
|
|
|
|
|
-hashtable_impl* getImpl(zk_hashtable* ht){
|
|
|
- return ht->ht;
|
|
|
-}
|
|
|
-
|
|
|
struct watcher_object_list {
|
|
|
watcher_object_t* head;
|
|
|
};
|
|
|
|
|
|
+/* the following functions are for testing only */
|
|
|
+typedef struct hashtable hashtable_impl;
|
|
|
+
|
|
|
+hashtable_impl* getImpl(zk_hashtable* ht){
|
|
|
+ return ht->ht;
|
|
|
+}
|
|
|
+
|
|
|
watcher_object_t* getFirstWatcher(zk_hashtable* ht,const char* path)
|
|
|
{
|
|
|
watcher_object_list_t* wl=hashtable_search(ht->ht,(void*)path);
|
|
@@ -50,6 +53,7 @@ watcher_object_t* getFirstWatcher(zk_hashtable* ht,const char* path)
|
|
|
return wl->head;
|
|
|
return 0;
|
|
|
}
|
|
|
+/* end of testing functions */
|
|
|
|
|
|
watcher_object_t* clone_watcher_object(watcher_object_t* wo)
|
|
|
{
|
|
@@ -76,7 +80,7 @@ static int string_equal(void *key1,void *key2)
|
|
|
return strcmp((const char*)key1,(const char*)key2)==0;
|
|
|
}
|
|
|
|
|
|
-watcher_object_t* create_watcher_object(watcher_fn watcher,void* ctx)
|
|
|
+static watcher_object_t* create_watcher_object(watcher_fn watcher,void* ctx)
|
|
|
{
|
|
|
watcher_object_t* wo=calloc(1,sizeof(watcher_object_t));
|
|
|
assert(wo);
|
|
@@ -110,9 +114,6 @@ zk_hashtable* create_zk_hashtable()
|
|
|
{
|
|
|
struct _zk_hashtable *ht=calloc(1,sizeof(struct _zk_hashtable));
|
|
|
assert(ht);
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_init(&ht->lock, 0);
|
|
|
-#endif
|
|
|
ht->ht=create_hashtable(32,string_hash_djb2,string_equal);
|
|
|
return ht;
|
|
|
}
|
|
@@ -132,25 +133,11 @@ static void do_clean_hashtable(zk_hashtable* ht)
|
|
|
free(it);
|
|
|
}
|
|
|
|
|
|
-void clean_zk_hashtable(zk_hashtable* ht)
|
|
|
-{
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_lock(&ht->lock);
|
|
|
-#endif
|
|
|
- do_clean_hashtable(ht);
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_unlock(&ht->lock);
|
|
|
-#endif
|
|
|
-}
|
|
|
-
|
|
|
void destroy_zk_hashtable(zk_hashtable* ht)
|
|
|
{
|
|
|
if(ht!=0){
|
|
|
do_clean_hashtable(ht);
|
|
|
hashtable_destroy(ht->ht,0);
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_destroy(&ht->lock);
|
|
|
-#endif
|
|
|
free(ht);
|
|
|
}
|
|
|
}
|
|
@@ -169,7 +156,8 @@ static watcher_object_t* search_watcher(watcher_object_list_t** wl,watcher_objec
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
-int add_to_list(watcher_object_list_t **wl, watcher_object_t *wo, int clone)
|
|
|
+static int add_to_list(watcher_object_list_t **wl, watcher_object_t *wo,
|
|
|
+ int clone)
|
|
|
{
|
|
|
if (search_watcher(wl, wo)==0) {
|
|
|
watcher_object_t* cloned=wo;
|
|
@@ -223,16 +211,11 @@ char **collect_keys(zk_hashtable *ht, int *count)
|
|
|
return list;
|
|
|
}
|
|
|
|
|
|
-int insert_watcher_object(zk_hashtable *ht, const char *path, watcher_object_t* wo)
|
|
|
+static int insert_watcher_object(zk_hashtable *ht, const char *path,
|
|
|
+ watcher_object_t* wo)
|
|
|
{
|
|
|
int res;
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_lock(&ht->lock);
|
|
|
-#endif
|
|
|
res=do_insert_watcher_object(ht,path,wo);
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_unlock(&ht->lock);
|
|
|
-#endif
|
|
|
return res;
|
|
|
}
|
|
|
|
|
@@ -260,29 +243,17 @@ static void copy_table(zk_hashtable *from, watcher_object_list_t *to) {
|
|
|
free(it);
|
|
|
}
|
|
|
|
|
|
-void collect_session_watchers(zhandle_t *zh, watcher_object_list_t **list)
|
|
|
+static void collect_session_watchers(zhandle_t *zh,
|
|
|
+ watcher_object_list_t **list)
|
|
|
{
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_lock(&zh->active_node_watchers->lock);
|
|
|
- pthread_mutex_lock(&zh->active_exist_watchers->lock);
|
|
|
- pthread_mutex_lock(&zh->active_child_watchers->lock);
|
|
|
-#endif
|
|
|
copy_table(zh->active_node_watchers, *list);
|
|
|
copy_table(zh->active_exist_watchers, *list);
|
|
|
copy_table(zh->active_child_watchers, *list);
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_unlock(&zh->active_node_watchers->lock);
|
|
|
- pthread_mutex_unlock(&zh->active_exist_watchers->lock);
|
|
|
- pthread_mutex_unlock(&zh->active_child_watchers->lock);
|
|
|
-#endif
|
|
|
}
|
|
|
|
|
|
static void add_for_event(zk_hashtable *ht, char *path, watcher_object_list_t **list)
|
|
|
{
|
|
|
watcher_object_list_t* wl;
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_lock(&ht->lock);
|
|
|
-#endif
|
|
|
wl = (watcher_object_list_t*)hashtable_remove(ht->ht, path);
|
|
|
if (wl) {
|
|
|
copy_watchers(wl, *list, 0);
|
|
@@ -290,9 +261,6 @@ static void add_for_event(zk_hashtable *ht, char *path, watcher_object_list_t **
|
|
|
// head pointer
|
|
|
free(wl);
|
|
|
}
|
|
|
-#ifdef THREADED
|
|
|
- pthread_mutex_unlock(&ht->lock);
|
|
|
-#endif
|
|
|
}
|
|
|
|
|
|
static void do_foreach_watcher(watcher_object_t* wo,zhandle_t* zh,
|
|
@@ -304,18 +272,9 @@ static void do_foreach_watcher(watcher_object_t* wo,zhandle_t* zh,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-int countList(watcher_object_t *wo) {
|
|
|
- int count = 0;
|
|
|
- while(wo) {
|
|
|
- count++;
|
|
|
- wo = wo->next;
|
|
|
- }
|
|
|
- return count;
|
|
|
-}
|
|
|
watcher_object_list_t *collectWatchers(zhandle_t *zh,int type, char *path)
|
|
|
{
|
|
|
struct watcher_object_list *list = create_watcher_object_list(0);
|
|
|
- int count = 0;
|
|
|
|
|
|
if(type==ZOO_SESSION_EVENT){
|
|
|
watcher_object_t defWatcher;
|
|
@@ -323,7 +282,6 @@ watcher_object_list_t *collectWatchers(zhandle_t *zh,int type, char *path)
|
|
|
defWatcher.context=zh->context;
|
|
|
add_to_list(&list, &defWatcher, 1);
|
|
|
collect_session_watchers(zh, &list);
|
|
|
- count = countList(list->head);
|
|
|
return list;
|
|
|
}
|
|
|
switch(type){
|
|
@@ -344,7 +302,6 @@ watcher_object_list_t *collectWatchers(zhandle_t *zh,int type, char *path)
|
|
|
add_for_event(zh->active_child_watchers,path,&list);
|
|
|
break;
|
|
|
}
|
|
|
- count = countList(list->head);
|
|
|
return list;
|
|
|
}
|
|
|
|
|
@@ -360,7 +317,7 @@ void activateWatcher(zhandle_t *zh, watcher_registration_t* reg, int rc)
|
|
|
{
|
|
|
if(reg){
|
|
|
/* in multithreaded lib, this code is executed
|
|
|
- * by the completion thread */
|
|
|
+ * by the IO thread */
|
|
|
zk_hashtable *ht = reg->checker(zh, rc);
|
|
|
if(ht){
|
|
|
insert_watcher_object(ht,reg->path,
|