Bladeren bron

ZOOKEEPER-2627: Remove ZRWSERVERFOUND from C client.

JIRA: https://issues.apache.org/jira/browse/ZOOKEEPER-2627

* Remove ZRWSERVERFOUND from C client to maintain consistency of error codes definition between Java / C client.
* Make C client behavior more conforming with Java client in RO mode by having an explicit RW server address and use that address whenever it's available.

Author: Michael Han <hanm@cloudera.com>

Reviewers: breed <breed@apache.org>, rgs <rgs@apache.org>

Closes #100 from hanm/ZOOKEEPER-2627
Michael Han 8 jaren geleden
bovenliggende
commit
73d6bf5353
3 gewijzigde bestanden met toevoegingen van 25 en 15 verwijderingen
  1. 0 1
      src/c/include/zookeeper.h
  2. 1 0
      src/c/src/zk_adaptor.h
  3. 24 14
      src/c/src/zookeeper.c

+ 0 - 1
src/c/include/zookeeper.h

@@ -124,7 +124,6 @@ enum ZOO_ERRORS {
   ZNOTREADONLY = -119, /*!< state-changing request is passed to read-only server */
   ZEPHEMERALONLOCALSESSION = -120, /*!< Attempt to create ephemeral node on a local session */
   ZNOWATCHER = -121, /*!< The watcher couldn't be found */
-  ZRWSERVERFOUND = -122, /*!< r/w server found while in r/o mode */
   ZRECONFIGDISABLED = -123 /*!< Attempts to perform a reconfiguration operation when reconfiguration feature is disabled */
 };
 

+ 1 - 0
src/c/src/zk_adaptor.h

@@ -194,6 +194,7 @@ struct _zhandle {
     // Hostlist and list of addresses
     char *hostname;                     // hostname contains list of zookeeper servers to connect to
     struct sockaddr_storage addr_cur;   // address of server we're currently connecting/connected to 
+    struct sockaddr_storage addr_rw_server; // address of last known read/write server found.
 
     addrvec_t addrs;                    // current list of addresses we're connected to
     addrvec_t addrs_old;                // old list of addresses that we are no longer connected to

+ 24 - 14
src/c/src/zookeeper.c

@@ -227,6 +227,7 @@ static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
 static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
 
 static int disable_conn_permute=0; // permute enabled by default
+static struct sockaddr_storage *addr_rw_server = 0;
 
 static __attribute__((unused)) void print_completion_queue(zhandle_t *zh);
 
@@ -1689,7 +1690,7 @@ static int is_connected(zhandle_t* zh)
     return (zh->state==ZOO_CONNECTED_STATE || zh->state==ZOO_READONLY_STATE);
 }
 
-static void handle_error(zhandle_t *zh,int rc)
+static void cleanup(zhandle_t *zh,int rc)
 {
     close(zh->fd);
     if (is_unrecoverable(zh)) {
@@ -1705,12 +1706,6 @@ static void handle_error(zhandle_t *zh,int rc)
 
     LOG_DEBUG(LOGCALLBACK(zh), "Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay);
 
-    // NOTE: If we're at the end of the list of addresses to connect to, then
-    // we want to delay the next connection attempt to avoid spinning.
-    // Then increment what host we'll connect to since we failed to connect to current
-    zh->delay = addrvec_atend(&zh->addrs);
-    addrvec_next(&zh->addrs, &zh->addr_cur);
-
     if (!is_unrecoverable(zh)) {
         zh->state = 0;
     }
@@ -1719,6 +1714,16 @@ static void handle_error(zhandle_t *zh,int rc)
     }
 }
 
+static void handle_error(zhandle_t *zh,int rc)
+{
+    cleanup(zh, rc);
+    // NOTE: If we're at the end of the list of addresses to connect to, then
+    // we want to delay the next connection attempt to avoid spinning.
+    // Then increment what host we'll connect to since we failed to connect to current
+    zh->delay = addrvec_atend(&zh->addrs);
+    addrvec_next(&zh->addrs, &zh->addr_cur);
+}
+
 static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
         const char* format, ...)
 {
@@ -2038,11 +2043,10 @@ static int ping_rw_server(zhandle_t* zh)
     socket_t sock;
     int rc;
     sendsize_t ssize;
-    struct sockaddr_storage addr;
 
-    addrvec_peek(&zh->addrs, &addr);
+    addrvec_peek(&zh->addrs, &zh->addr_rw_server);
 
-    sock = socket(addr.ss_family, SOCK_STREAM, 0);
+    sock = socket(zh->addr_rw_server.ss_family, SOCK_STREAM, 0);
     if (sock < 0) {
         return 0;
     }
@@ -2050,7 +2054,7 @@ static int ping_rw_server(zhandle_t* zh)
     zookeeper_set_sock_nodelay(zh, sock);
     zookeeper_set_sock_timeout(zh, sock, 1);
 
-    rc = zookeeper_connect(zh, &addr, sock);
+    rc = zookeeper_connect(zh, &zh->addr_rw_server, sock);
     if (rc < 0) {
         return 0;
     }
@@ -2072,6 +2076,7 @@ static int ping_rw_server(zhandle_t* zh)
 
 out:
     close(sock);
+    addr_rw_server = rc ? &zh->addr_rw_server : 0;
     return rc;
 }
 
@@ -2205,8 +2210,13 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
             LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
                      zh->hostname);
         } else {
-            // No need to delay -- grab the next server and attempt connection
-            zoo_cycle_next_server(zh);
+            if (addr_rw_server) {
+                zh->addr_cur = *addr_rw_server;
+                addr_rw_server = 0;
+            } else {
+                // No need to delay -- grab the next server and attempt connection
+                zoo_cycle_next_server(zh);
+            }
             zh->fd = socket(zh->addr_cur.ss_family, SOCK_STREAM, 0);
             if (zh->fd < 0) {
               rc = handle_socket_error_msg(zh,
@@ -2308,7 +2318,7 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
                     LOG_INFO(LOGCALLBACK(zh),
                              "r/w server found at %s",
                              format_endpoint_info(&addr));
-                    handle_error(zh, ZRWSERVERFOUND);
+                    cleanup(zh, ZOK);
                 } else {
                     addrvec_next(&zh->addrs, NULL);
                 }