Ver Fonte

ZOOKEEPER-1919 Update the C implementation of removeWatches to have it match ZOOKEEPER-1910

Existing patch on Jira has been rebased and applied to master.

Replaces #511

Author: Andor Molnar <andor@cloudera.com>

Reviewers: phunt@apache.org

Closes #522 from anmolnar/ZOOKEEPER-1919 and squashes the following commits:

f81a7608a [Andor Molnar] ZOOKEEPER-1919. Fixed indentation
d6455b062 [Andor Molnar] ZOOKEEPER-1919. Test fixes
769677693 [Andor Molnar] ZOOKEEPER-1919. Changes in C code

Change-Id: I2597854e4b28196a5dc5dc89553ced41537663eb
Andor Molnar há 7 anos atrás
pai
commit
9c4bd4d335
5 ficheiros alterados com 187 adições e 72 exclusões
  1. 2 1
      src/c/include/proto.h
  2. 54 14
      src/c/include/zookeeper.h
  3. 6 6
      src/c/src/zk_hashtable.c
  4. 74 14
      src/c/src/zookeeper.c
  5. 51 37
      src/c/tests/TestClient.cc

+ 2 - 1
src/c/include/proto.h

@@ -38,7 +38,8 @@ extern "C" {
 #define ZOO_MULTI_OP 14
 #define ZOO_CREATE2_OP 15
 #define ZOO_RECONFIG_OP 16
-#define ZOO_REMOVE_WATCHES 17
+#define ZOO_CHECK_WATCHES 17
+#define ZOO_REMOVE_WATCHES 18
 #define ZOO_CLOSE_OP -11
 #define ZOO_SETAUTH_OP 100
 #define ZOO_SETWATCHES_OP 101

+ 54 - 14
src/c/include/zookeeper.h

@@ -1511,24 +1511,24 @@ ZOOAPI void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback);
 ZOOAPI void zoo_deterministic_conn_order(int yesOrNo);
 
 /**
- * Type of watchers: used to select which type of watchers should be removed
+ * Type of watches: used to select which type of watches should be removed
  */
 typedef enum {
-  ZWATCHERTYPE_CHILDREN = 1,
-  ZWATCHERTYPE_DATA = 2,
-  ZWATCHERTYPE_ANY = 3
+  ZWATCHTYPE_CHILD = 1,
+  ZWATCHTYPE_DATA = 2,
+  ZWATCHTYPE_ANY = 3	
 } ZooWatcherType;
 
 /**
- * \brief removes the watchers for the given path and watcher type.
+ * \brief removes the watches for the given path and watcher type.
  *
  * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
- * \param path the path for which watchers will be removed
+ * \param path the path for which watches will be removed
  * \param wtype the watcher type to be removed
- * \param watcher the watcher to be removed, if null all watchers for that
+ * \param watcher the watcher to be removed, if null all watches for that
  * path (and watcher type) will be removed
  * \param watcherCtx the contex associated with the watcher to be removed
- * \param local whether the watchers will be removed locally even if there is
+ * \param local whether the watches will be removed locally even if there is
  * no server connection
  * \return the return code for the function call.
  * ZOK - operation completed successfully
@@ -1539,10 +1539,50 @@ typedef enum {
  * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
  * ZSYSTEMERROR - a system error occured
  */
-ZOOAPI int zoo_aremove_watchers(zhandle_t *zh, const char *path,
+ZOOAPI int zoo_aremove_watches(zhandle_t *zh, const char *path,
         ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local,
         void_completion_t *completion, const void *data);
 
+/**
+ * \brief removes all the watches for the given path and watcher type.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the path for which watches will be removed
+ * \param wtype the watcher type to be removed
+ * \param local whether the watches will be removed locally even if there is
+ * no server connection
+ * \return the return code for the function call.
+ * ZOK - operation completed successfully
+ * ZNOWATCHER - the watcher couldn't be found.
+ * ZINVALIDSTATE - if !local, zhandle state is either ZOO_SESSION_EXPIRED_STATE
+ * or ZOO_AUTH_FAILED_STATE
+ * ZBADARGUMENTS - invalid input parameters
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZSYSTEMERROR - a system error occured
+ */
+ZOOAPI int zoo_remove_all_watches(zhandle_t *zh, const char *path,
+        ZooWatcherType wtype, int local);
+
+/**
+ * \brief removes all the watches for the given path and watcher type.
+ *
+ * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
+ * \param path the path for which watches will be removed
+ * \param wtype the watcher type to be removed
+ * \param local whether the watches will be removed locally even if there is
+ * no server connection
+ * \return the return code for the function call.
+ * ZOK - operation completed successfully
+ * ZNOWATCHER - the watcher couldn't be found.
+ * ZINVALIDSTATE - if !local, zhandle state is either ZOO_SESSION_EXPIRED_STATE
+ * or ZOO_AUTH_FAILED_STATE
+ * ZBADARGUMENTS - invalid input parameters
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ * ZSYSTEMERROR - a system error occured
+*/
+ZOOAPI int zoo_aremove_all_watches(zhandle_t *zh, const char *path,
+        ZooWatcherType wtype, int local, void_completion_t *completion,
+        const void *data);
 
 #ifdef THREADED
 /**
@@ -2043,15 +2083,15 @@ ZOOAPI int zoo_set_acl(zhandle_t *zh, const char *path, int version,
 ZOOAPI int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results);
 
 /**
- * \brief removes the watchers for the given path and watcher type.
+ * \brief removes the watches for the given path and watcher type.
  *
  * \param zh the zookeeper handle obtained by a call to \ref zookeeper_init
- * \param path the path for which watchers will be removed
+ * \param path the path for which watches will be removed
  * \param wtype the watcher type to be removed
- * \param watcher the watcher to be removed, if null all watchers for that
+ * \param watcher the watcher to be removed, if null all watches for that
  * path (and watcher type) will be removed
  * \param watcherCtx the contex associated with the watcher to be removed
- * \param local whether the watchers will be removed locally even if there is
+ * \param local whether the watches will be removed locally even if there is
  * no server connection
  * \return the return code for the function call.
  * ZOK - operation completed successfully
@@ -2062,7 +2102,7 @@ ZOOAPI int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_resul
  * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
  * ZSYSTEMERROR - a system error occured
  */
-ZOOAPI int zoo_remove_watchers(zhandle_t *zh, const char *path,
+ZOOAPI int zoo_remove_watches(zhandle_t *zh, const char *path,
         ZooWatcherType wtype, watcher_fn watcher, void *watcherCtx, int local);
 #endif
 #ifdef __cplusplus

+ 6 - 6
src/c/src/zk_hashtable.c

@@ -425,14 +425,14 @@ void removeWatchers(zhandle_t *zh, const char* path, ZooWatcherType type,
         watcher_fn watcher, void *watcherCtx)
 {
     switch (type) {
-    case ZWATCHERTYPE_CHILDREN:
+    case ZWATCHTYPE_CHILD:
         removeWatcher(zh->active_child_watchers, path, watcher, watcherCtx);
         break;
-    case ZWATCHERTYPE_DATA:
+    case ZWATCHTYPE_DATA:
         removeWatcher(zh->active_node_watchers, path, watcher, watcherCtx);
         removeWatcher(zh->active_exist_watchers, path, watcher, watcherCtx);
         break;
-    case ZWATCHERTYPE_ANY:
+    case ZWATCHTYPE_ANY:
         removeWatcher(zh->active_child_watchers, path, watcher, watcherCtx);
         removeWatcher(zh->active_node_watchers, path, watcher, watcherCtx);
         removeWatcher(zh->active_exist_watchers, path, watcher, watcherCtx);
@@ -446,11 +446,11 @@ int pathHasWatcher(zhandle_t *zh, const char *path, int wtype,
     int watcher_found = 0;
 
     switch (wtype) {
-    case ZWATCHERTYPE_CHILDREN:
+    case ZWATCHTYPE_CHILD:
         watcher_found = containsWatcher(zh->active_child_watchers,
                                         path, watcher, watcherCtx);
         break;
-    case ZWATCHERTYPE_DATA:
+    case ZWATCHTYPE_DATA:
         watcher_found = containsWatcher(zh->active_node_watchers, path,
                                         watcher, watcherCtx);
         if (!watcher_found) {
@@ -458,7 +458,7 @@ int pathHasWatcher(zhandle_t *zh, const char *path, int wtype,
                                             watcher, watcherCtx);
         }
         break;
-    case ZWATCHERTYPE_ANY:
+    case ZWATCHTYPE_ANY:
         watcher_found = containsWatcher(zh->active_child_watchers, path,
                                         watcher, watcherCtx);
         if (!watcher_found) {

+ 74 - 14
src/c/src/zookeeper.c

@@ -258,11 +258,20 @@ static __attribute__((unused)) void print_completion_queue(zhandle_t *zh);
 static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
 static int isValidPath(const char* path, const int flags);
 
+static int aremove_watches(
+    zhandle_t *zh, const char *path, ZooWatcherType wtype,
+    watcher_fn watcher, void *watcherCtx, int local,
+    void_completion_t *completion, const void *data, int all);
+
 #ifdef THREADED
 static void process_sync_completion(zhandle_t *zh,
         completion_list_t *cptr,
         struct sync_completion *sc,
         struct iarchive *ia);
+
+static int remove_watches(
+    zhandle_t *zh, const char *path, ZooWatcherType wtype,
+    watcher_fn watcher, void *watcherCtx, int local, int all);
 #endif
 
 #ifdef _WIN32
@@ -4018,16 +4027,26 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
     return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
 }
 
+typedef union WatchesRequest WatchesRequest;
 
-int zoo_aremove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
+union WatchesRequest {
+    struct CheckWatchesRequest check;
+    struct RemoveWatchesRequest remove;
+};
+
+static int aremove_watches(
+        zhandle_t *zh, const char *path, ZooWatcherType wtype,
         watcher_fn watcher, void *watcherCtx, int local,
-        void_completion_t *completion, const void *data)
+        void_completion_t *completion, const void *data, int all)
 {
     char *server_path = prepend_string(zh, path);
     int rc;
     struct oarchive *oa;
-    struct RequestHeader h = { get_xid(), ZOO_REMOVE_WATCHES };
-    struct RemoveWatchesRequest req =  { (char*)server_path, wtype };
+    struct RequestHeader h = { 
+        get_xid(), 
+        all ? ZOO_REMOVE_WATCHES : ZOO_CHECK_WATCHES 
+    };
+    WatchesRequest req;
     watcher_deregistration_t *wdo;
 
     if (!zh || !isValidPath(server_path, 0)) {
@@ -4056,21 +4075,32 @@ int zoo_aremove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
 
     oa = create_buffer_oarchive();
     rc = serialize_RequestHeader(oa, "header", &h);
-    rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req);
+
+    if (all) {
+       req.remove.path = (char*)server_path;
+       req.remove.type = wtype;
+       rc = rc < 0 ? rc : serialize_RemoveWatchesRequest(oa, "req", &req.remove);
+    } else {
+        req.check.path = (char*)server_path;
+        req.check.type = wtype;
+        rc = rc < 0 ? rc : serialize_CheckWatchesRequest(oa, "req", &req.check);
+    }
+
     if (rc < 0) {
         goto done;
     }
 
-    wdo = create_watcher_deregistration(server_path, watcher, watcherCtx,
-                                        wtype);
+    wdo = create_watcher_deregistration(
+        server_path, watcher, watcherCtx, wtype);
+
     if (!wdo) {
         rc = ZSYSTEMERROR;
         goto done;
     }
 
     enter_critical(zh);
-    rc = add_completion_deregistration(zh, h.xid, COMPLETION_VOID,
-                                       completion, data, 0, wdo, 0);
+    rc = add_completion_deregistration(
+        zh, h.xid, COMPLETION_VOID, completion, data, 0, wdo, 0);
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
     rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
@@ -4816,11 +4846,12 @@ int zoo_set_acl(zhandle_t *zh, const char *path, int version,
     return rc;
 }
 
-int zoo_remove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
-         watcher_fn watcher, void *watcherCtx, int local)
+static int remove_watches(
+    zhandle_t *zh, const char *path, ZooWatcherType wtype,
+    watcher_fn watcher, void *wctx, int local, int all)
 {
-    struct sync_completion *sc;
     int rc = 0;
+    struct sync_completion *sc;
 
     if (!path)
         return ZBADARGUMENTS;
@@ -4829,8 +4860,8 @@ int zoo_remove_watchers(zhandle_t *zh, const char *path, ZooWatcherType wtype,
     if (!sc)
         return ZSYSTEMERROR;
 
-    rc = zoo_aremove_watchers(zh, path, wtype, watcher, watcherCtx, local,
-                              SYNCHRONOUS_MARKER, sc);
+    rc = aremove_watches(zh, path, wtype, watcher, wctx, local,
+                              SYNCHRONOUS_MARKER, sc, all);
     if (rc == ZOK) {
         wait_sync_completion(sc);
         rc = sc->rc;
@@ -4857,4 +4888,33 @@ int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *re
 
     return rc;
 }
+
+int zoo_remove_watches(zhandle_t *zh, const char *path, ZooWatcherType wtype,
+         watcher_fn watcher, void *watcherCtx, int local)
+{
+    return remove_watches(zh, path, wtype, watcher, watcherCtx, local, 0);
+}
+
+int zoo_remove_all_watches(
+        zhandle_t *zh, const char *path, ZooWatcherType wtype, int local)
+{
+    return remove_watches(zh, path, wtype, NULL, NULL, local, 1);
+
+}
 #endif
+
+int zoo_aremove_watches(zhandle_t *zh, const char *path, ZooWatcherType wtype,
+        watcher_fn watcher, void *watcherCtx, int local,
+        void_completion_t *completion, const void *data)
+{
+    return aremove_watches(
+        zh, path, wtype, watcher, watcherCtx, local, completion, data, 0);
+}
+
+int zoo_aremove_all_watches(zhandle_t *zh, const char *path,
+        ZooWatcherType wtype, int local, void_completion_t *completion,
+        const void *data)
+{
+    return aremove_watches(
+        zh, path, wtype, NULL, NULL, local, completion, data, 1);
+}

+ 51 - 37
src/c/tests/TestClient.cc

@@ -1318,70 +1318,84 @@ public:
       CPPUNIT_ASSERT_EQUAL(updated, (int64_t) zk2->last_zxid);
     }
 
-    static void watcher_remove_watchers(zhandle_t *zh, int type,
-                                    int state, const char *path,void *watcherCtx) {
-        count++;
+	static void watcher_rw(zhandle_t *zh,
+		                   int type,
+		                   int state,
+		                   const char *path,
+		                   void *ctx) {
+		count++;
+	}
+
+	static void watcher_rw2(zhandle_t *zh,
+		                    int type,
+		                    int state,
+		                    const char *path,
+		                    void *ctx) {
+		count++;
     }
 
     void testRemoveWatchers() {
+      char *path = "/something";
+      char buf[1024];
+      int blen = sizeof(buf);		
       int rc;
       watchctx_t ctx;
-      zhandle_t *zk = createClient(&ctx);
-      CPPUNIT_ASSERT(zk);
+			zhandle_t *zk;
 
-      count = 0;
+			/* setup path */
+      zk = createClient(&ctx);
+      CPPUNIT_ASSERT(zk);
 
-      rc = zoo_create(zk, "/something", "", 0,
-                      &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
+      rc = zoo_create(zk, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
 
       rc = zoo_create(zk, "/something2", "", 0,
                       &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
 
-      char buf[1024];
-      int blen = sizeof(buf);
-      rc = zoo_get(zk, "/something", 1, buf, &blen, NULL);
-
       /* remove all watchers */
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_DATA,
-                               NULL, NULL, 0);
-      CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
-
+      count = 0;
+      rc = zoo_wget(zk, path, watcher_rw, NULL, buf, &blen, NULL);
+      rc = zoo_wget(zk, path, watcher_rw2, NULL, buf, &blen, NULL);
+      rc = zoo_remove_all_watches(zk, path, ZWATCHTYPE_ANY, 0);
+			CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+			rc = zoo_set(zk, path, "nowatch", 7, -1);
+      CPPUNIT_ASSERT(count == 0);
+			
       /* remove a specific watcher before it's added (should fail) */
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_DATA,
-                               watcher_remove_watchers, NULL, 0);
+      rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
+                               watcher_rw, NULL, 0);
       CPPUNIT_ASSERT_EQUAL((int)ZNOWATCHER, rc);
 
       /* now add a specific watcher and then remove it */
-      rc = zoo_wget(zk, "/something", watcher_remove_watchers, NULL,
+      rc = zoo_wget(zk, path, watcher_rw, NULL,
                     buf, &blen, NULL);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_DATA,
-                               watcher_remove_watchers, NULL, 0);
+      rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
+                               watcher_rw, NULL, 0);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
 
       /* ditto for children watcher */
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_CHILDREN,
-                               watcher_remove_watchers, NULL, 0);
+      rc = zoo_remove_watches(zk, path, ZWATCHTYPE_CHILD,
+                               watcher_rw, NULL, 0);
       CPPUNIT_ASSERT_EQUAL((int)ZNOWATCHER, rc);
 
       struct String_vector str_vec = {0, NULL};
-      rc = zoo_wget_children(zk, "/something", watcher_remove_watchers, NULL,
+      rc = zoo_wget_children(zk, path, watcher_rw, NULL,
                              &str_vec);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_CHILDREN,
-                               watcher_remove_watchers, NULL, 0);
+      rc = zoo_remove_watches(zk, path, ZWATCHTYPE_CHILD,
+                               watcher_rw, NULL, 0);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
 
       /* add a watch, stop the server, and have remove fail */
-      rc = zoo_wget(zk, "/something", watcher_remove_watchers, NULL,
+      rc = zoo_wget(zk, path, watcher_rw, NULL,
                     buf, &blen, NULL);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
       stopServer();
       ctx.waitForDisconnected(zk);
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_DATA,
-                               watcher_remove_watchers, NULL, 0);
+      rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
+                               watcher_rw, NULL, 0);
       CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS, rc);
 
       /* bring the server back */
@@ -1392,25 +1406,25 @@ public:
       void* ctx1=(void*)0x1;
       void* ctx2=(void*)0x2;
 
-      rc = zoo_wget(zk, "/something", watcher_remove_watchers, ctx1,
+      rc = zoo_wget(zk, path, watcher_rw, ctx1,
                     buf, &blen, NULL);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
 
-      rc = zoo_wget(zk, "/something2", watcher_remove_watchers, ctx2,
+      rc = zoo_wget(zk, "/something2", watcher_rw, ctx2,
                          buf, &blen, NULL);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
 
       stopServer();
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_DATA,
-                               watcher_remove_watchers, ctx1, 1);
+      rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
+                               watcher_rw, ctx1, 1);
       CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
 
-      rc = zoo_remove_watchers(zk, "/something", ZWATCHERTYPE_DATA,
-                                    watcher_remove_watchers, ctx1, 1);
+      rc = zoo_remove_watches(zk, path, ZWATCHTYPE_DATA,
+                                    watcher_rw, ctx1, 1);
       CPPUNIT_ASSERT_EQUAL((int)ZNOWATCHER, rc);
 
-      rc = zoo_remove_watchers(zk, "/something2", ZWATCHERTYPE_DATA,
-                                          watcher_remove_watchers, ctx2, 1);
+      rc = zoo_remove_watches(zk, "/something2", ZWATCHTYPE_DATA,
+                                          watcher_rw, ctx2, 1);
       CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
     }
 };