|
@@ -189,10 +189,10 @@ static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *
|
|
|
|
|
|
/* completion routine forward declarations */
|
|
/* completion routine forward declarations */
|
|
static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
static int add_completion(zhandle_t *zh, int xid, int completion_type,
|
|
- const void *dc, const void *data, int add_to_front,
|
|
|
|
|
|
+ const void *dc, const void *data, int add_to_front,
|
|
watcher_registration_t* wo, completion_head_t *clist);
|
|
watcher_registration_t* wo, completion_head_t *clist);
|
|
static completion_list_t* create_completion_entry(int xid, int completion_type,
|
|
static completion_list_t* create_completion_entry(int xid, int completion_type,
|
|
- const void *dc, const void *data, watcher_registration_t* wo,
|
|
|
|
|
|
+ const void *dc, const void *data, watcher_registration_t* wo,
|
|
completion_head_t *clist);
|
|
completion_head_t *clist);
|
|
static void destroy_completion_entry(completion_list_t* c);
|
|
static void destroy_completion_entry(completion_list_t* c);
|
|
static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
|
|
static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
|
|
@@ -438,12 +438,12 @@ static void setup_random()
|
|
|
|
|
|
#ifndef __CYGWIN__
|
|
#ifndef __CYGWIN__
|
|
/**
|
|
/**
|
|
- * get the errno from the return code
|
|
|
|
|
|
+ * get the errno from the return code
|
|
* of get addrinfo. Errno is not set
|
|
* of get addrinfo. Errno is not set
|
|
* with the call to getaddrinfo, so thats
|
|
* with the call to getaddrinfo, so thats
|
|
* why we have to do this.
|
|
* why we have to do this.
|
|
*/
|
|
*/
|
|
-static int getaddrinfo_errno(int rc) {
|
|
|
|
|
|
+static int getaddrinfo_errno(int rc) {
|
|
switch(rc) {
|
|
switch(rc) {
|
|
case EAI_NONAME:
|
|
case EAI_NONAME:
|
|
// ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
|
|
// ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
|
|
@@ -616,10 +616,10 @@ int resolve_hosts(const char *hosts_in, addrvec_t *avec)
|
|
|
|
|
|
if ((rc = getaddrinfo(host, port_spec, &hints, &res0)) != 0) {
|
|
if ((rc = getaddrinfo(host, port_spec, &hints, &res0)) != 0) {
|
|
//bug in getaddrinfo implementation when it returns
|
|
//bug in getaddrinfo implementation when it returns
|
|
- //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
|
|
|
|
|
|
+ //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
|
|
// ai_flags as AI_ADDRCONFIG
|
|
// ai_flags as AI_ADDRCONFIG
|
|
#ifdef AI_ADDRCONFIG
|
|
#ifdef AI_ADDRCONFIG
|
|
- if ((hints.ai_flags == AI_ADDRCONFIG) &&
|
|
|
|
|
|
+ if ((hints.ai_flags == AI_ADDRCONFIG) &&
|
|
// ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
|
|
// ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
|
|
#ifdef EAI_ADDRFAMILY
|
|
#ifdef EAI_ADDRFAMILY
|
|
((rc ==EAI_BADFLAGS) || (rc == EAI_ADDRFAMILY))) {
|
|
((rc ==EAI_BADFLAGS) || (rc == EAI_ADDRFAMILY))) {
|
|
@@ -707,19 +707,19 @@ fail:
|
|
* a) the server this client is currently connected is not in new address list.
|
|
* a) the server this client is currently connected is not in new address list.
|
|
* Otherwise (if currentHost is in the new list):
|
|
* Otherwise (if currentHost is in the new list):
|
|
* b) the number of servers in the cluster is increasing - in this case the load
|
|
* b) the number of servers in the cluster is increasing - in this case the load
|
|
- * on currentHost should decrease, which means that SOME of the clients
|
|
|
|
|
|
+ * on currentHost should decrease, which means that SOME of the clients
|
|
* connected to it will migrate to the new servers. The decision whether this
|
|
* connected to it will migrate to the new servers. The decision whether this
|
|
- * client migrates or not is probabilistic so that the expected number of
|
|
|
|
|
|
+ * client migrates or not is probabilistic so that the expected number of
|
|
* clients connected to each server is the same.
|
|
* clients connected to each server is the same.
|
|
- *
|
|
|
|
- * If reconfig is set to true, the function sets pOld and pNew that correspond
|
|
|
|
|
|
+ *
|
|
|
|
+ * If reconfig is set to true, the function sets pOld and pNew that correspond
|
|
* to the probability to migrate to ones of the new servers or one of the old
|
|
* to the probability to migrate to ones of the new servers or one of the old
|
|
* servers (migrating to one of the old servers is done only if our client's
|
|
* servers (migrating to one of the old servers is done only if our client's
|
|
- * currentHost is not in new list).
|
|
|
|
- *
|
|
|
|
|
|
+ * currentHost is not in new list).
|
|
|
|
+ *
|
|
* See zoo_cycle_next_server for the selection logic.
|
|
* See zoo_cycle_next_server for the selection logic.
|
|
- *
|
|
|
|
- * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
|
|
|
|
|
|
+ *
|
|
|
|
+ * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
|
|
* protocol and its evaluation,
|
|
* protocol and its evaluation,
|
|
*/
|
|
*/
|
|
int update_addrs(zhandle_t *zh)
|
|
int update_addrs(zhandle_t *zh)
|
|
@@ -741,7 +741,7 @@ int update_addrs(zhandle_t *zh)
|
|
if (zh->hostname == NULL)
|
|
if (zh->hostname == NULL)
|
|
{
|
|
{
|
|
return ZSYSTEMERROR;
|
|
return ZSYSTEMERROR;
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
|
|
// NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
|
|
lock_reconfig(zh);
|
|
lock_reconfig(zh);
|
|
@@ -784,7 +784,7 @@ int update_addrs(zhandle_t *zh)
|
|
{
|
|
{
|
|
goto fail;
|
|
goto fail;
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }
|
|
else {
|
|
else {
|
|
rc = addrvec_append(&zh->addrs_new, resolved_address);
|
|
rc = addrvec_append(&zh->addrs_new, resolved_address);
|
|
if (rc != ZOK)
|
|
if (rc != ZOK)
|
|
@@ -811,12 +811,12 @@ int update_addrs(zhandle_t *zh)
|
|
zh->reconfig = 0;
|
|
zh->reconfig = 0;
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- // my server is not in the new config, and load on old servers must
|
|
|
|
|
|
+ // my server is not in the new config, and load on old servers must
|
|
// be decreased, so connect to one of the new servers
|
|
// be decreased, so connect to one of the new servers
|
|
zh->pNew = 1;
|
|
zh->pNew = 1;
|
|
zh->pOld = 0;
|
|
zh->pOld = 0;
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
// Number of servers stayed the same or decreased
|
|
// Number of servers stayed the same or decreased
|
|
else {
|
|
else {
|
|
@@ -833,8 +833,8 @@ int update_addrs(zhandle_t *zh)
|
|
addrvec_free(&zh->addrs);
|
|
addrvec_free(&zh->addrs);
|
|
zh->addrs = resolved;
|
|
zh->addrs = resolved;
|
|
|
|
|
|
- // If we need to do a reconfig and we're currently connected to a server,
|
|
|
|
- // then force close that connection so on next interest() call we'll make a
|
|
|
|
|
|
+ // If we need to do a reconfig and we're currently connected to a server,
|
|
|
|
+ // then force close that connection so on next interest() call we'll make a
|
|
// new connection
|
|
// new connection
|
|
if (zh->reconfig == 1 && zh->fd != -1)
|
|
if (zh->reconfig == 1 && zh->fd != -1)
|
|
{
|
|
{
|
|
@@ -847,7 +847,7 @@ fail:
|
|
|
|
|
|
unlock_reconfig(zh);
|
|
unlock_reconfig(zh);
|
|
|
|
|
|
- // If we short-circuited out and never assigned resolved to zh->addrs then we
|
|
|
|
|
|
+ // If we short-circuited out and never assigned resolved to zh->addrs then we
|
|
// need to free resolved to avoid a memleak.
|
|
// need to free resolved to avoid a memleak.
|
|
if (zh->addrs.data != resolved.data)
|
|
if (zh->addrs.data != resolved.data)
|
|
{
|
|
{
|
|
@@ -1092,15 +1092,15 @@ int zoo_set_servers(zhandle_t *zh, const char *hosts)
|
|
* we've updated the server list to connect to, and are now trying to find some
|
|
* we've updated the server list to connect to, and are now trying to find some
|
|
* server to connect to. Once we get successfully connected, 'reconfig' mode is
|
|
* server to connect to. Once we get successfully connected, 'reconfig' mode is
|
|
* set to false. Similarly, if we tried to connect to all servers in new config
|
|
* set to false. Similarly, if we tried to connect to all servers in new config
|
|
- * and failed, 'reconfig' mode is set to false.
|
|
|
|
|
|
+ * and failed, 'reconfig' mode is set to false.
|
|
*
|
|
*
|
|
* While in 'reconfig' mode, we should connect to a server in the new set of
|
|
* While in 'reconfig' mode, we should connect to a server in the new set of
|
|
- * servers (addrs_new) with probability pNew and to servers in the old set of
|
|
|
|
|
|
+ * servers (addrs_new) with probability pNew and to servers in the old set of
|
|
* servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
|
|
* servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
|
|
- * out all servers in either, we continue to try servers from the other set,
|
|
|
|
|
|
+ * out all servers in either, we continue to try servers from the other set,
|
|
* regardless of pNew or pOld. If we tried all servers we give up and go back to
|
|
* regardless of pNew or pOld. If we tried all servers we give up and go back to
|
|
* the normal round robin mode
|
|
* the normal round robin mode
|
|
- *
|
|
|
|
|
|
+ *
|
|
* When called, must be protected by lock_reconfig(zh).
|
|
* When called, must be protected by lock_reconfig(zh).
|
|
*/
|
|
*/
|
|
static int get_next_server_in_reconfig(zhandle_t *zh)
|
|
static int get_next_server_in_reconfig(zhandle_t *zh)
|
|
@@ -1108,16 +1108,16 @@ static int get_next_server_in_reconfig(zhandle_t *zh)
|
|
int take_new = drand48() <= zh->pNew;
|
|
int take_new = drand48() <= zh->pNew;
|
|
|
|
|
|
LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
|
|
LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
|
|
- zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
|
|
|
|
|
|
+ zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
|
|
addrvec_hasnext(&zh->addrs_old)));
|
|
addrvec_hasnext(&zh->addrs_old)));
|
|
LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
|
|
LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
|
|
- zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
|
|
|
|
|
|
+ zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
|
|
addrvec_hasnext(&zh->addrs_new)));
|
|
addrvec_hasnext(&zh->addrs_new)));
|
|
|
|
|
|
// Take one of the new servers if we haven't tried them all yet
|
|
// Take one of the new servers if we haven't tried them all yet
|
|
// and either the probability tells us to connect to one of the new servers
|
|
// and either the probability tells us to connect to one of the new servers
|
|
// or if we already tried them all then use one of the old servers
|
|
// or if we already tried them all then use one of the old servers
|
|
- if (addrvec_hasnext(&zh->addrs_new)
|
|
|
|
|
|
+ if (addrvec_hasnext(&zh->addrs_new)
|
|
&& (take_new || !addrvec_hasnext(&zh->addrs_old)))
|
|
&& (take_new || !addrvec_hasnext(&zh->addrs_old)))
|
|
{
|
|
{
|
|
addrvec_next(&zh->addrs_new, &zh->addr_cur);
|
|
addrvec_next(&zh->addrs_new, &zh->addr_cur);
|
|
@@ -1137,14 +1137,14 @@ static int get_next_server_in_reconfig(zhandle_t *zh)
|
|
return 1;
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
|
|
-/**
|
|
|
|
|
|
+/**
|
|
* Cycle through our server list to the correct 'next' server. The 'next' server
|
|
* Cycle through our server list to the correct 'next' server. The 'next' server
|
|
* to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
|
|
* to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
|
|
* mode means we've upated the server list and are now trying to find a server
|
|
* mode means we've upated the server list and are now trying to find a server
|
|
* to connect to. Once we get connected, we are no longer in the reconfig mode.
|
|
* to connect to. Once we get connected, we are no longer in the reconfig mode.
|
|
* Similarly, if we try to connect to all the servers in the new configuration
|
|
* Similarly, if we try to connect to all the servers in the new configuration
|
|
* and failed, reconfig mode is set to false.
|
|
* and failed, reconfig mode is set to false.
|
|
- *
|
|
|
|
|
|
+ *
|
|
* For more algorithm details, see get_next_server_in_reconfig.
|
|
* For more algorithm details, see get_next_server_in_reconfig.
|
|
*/
|
|
*/
|
|
void zoo_cycle_next_server(zhandle_t *zh)
|
|
void zoo_cycle_next_server(zhandle_t *zh)
|
|
@@ -1172,7 +1172,7 @@ void zoo_cycle_next_server(zhandle_t *zh)
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
-/**
|
|
|
|
|
|
+/**
|
|
* Get the host:port for the server we are currently connecting to or connected
|
|
* Get the host:port for the server we are currently connecting to or connected
|
|
* to. This is largely for testing purposes but is also generally useful for
|
|
* to. This is largely for testing purposes but is also generally useful for
|
|
* other client software built on top of this client.
|
|
* other client software built on top of this client.
|
|
@@ -1364,9 +1364,9 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
if (rc == -1) {
|
|
if (rc == -1) {
|
|
#ifndef _WINDOWS
|
|
#ifndef _WINDOWS
|
|
if (errno != EAGAIN) {
|
|
if (errno != EAGAIN) {
|
|
-#else
|
|
|
|
|
|
+#else
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
-#endif
|
|
|
|
|
|
+#endif
|
|
return -1;
|
|
return -1;
|
|
} else {
|
|
} else {
|
|
return 0;
|
|
return 0;
|
|
@@ -1383,9 +1383,9 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
if (rc == -1) {
|
|
if (rc == -1) {
|
|
#ifndef _WINDOWS
|
|
#ifndef _WINDOWS
|
|
if (errno != EAGAIN) {
|
|
if (errno != EAGAIN) {
|
|
-#else
|
|
|
|
|
|
+#else
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
-#endif
|
|
|
|
|
|
+#endif
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
@@ -1670,7 +1670,7 @@ static int send_auth_info(zhandle_t *zh) {
|
|
}
|
|
}
|
|
|
|
|
|
static int send_last_auth_info(zhandle_t *zh)
|
|
static int send_last_auth_info(zhandle_t *zh)
|
|
-{
|
|
|
|
|
|
+{
|
|
int rc = 0;
|
|
int rc = 0;
|
|
auth_info *auth = NULL;
|
|
auth_info *auth = NULL;
|
|
|
|
|
|
@@ -1724,7 +1724,7 @@ static int send_set_watches(zhandle_t *zh)
|
|
/* add this buffer to the head of the send queue */
|
|
/* add this buffer to the head of the send queue */
|
|
rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
get_buffer_len(oa));
|
|
get_buffer_len(oa));
|
|
- /* We queued the buffer, so don't free it */
|
|
|
|
|
|
+ /* We queued the buffer, so don't free it */
|
|
close_buffer_oarchive(&oa, 0);
|
|
close_buffer_oarchive(&oa, 0);
|
|
free_key_list(req.dataWatches.data, req.dataWatches.count);
|
|
free_key_list(req.dataWatches.data, req.dataWatches.count);
|
|
free_key_list(req.existWatches.data, req.existWatches.count);
|
|
free_key_list(req.existWatches.data, req.existWatches.count);
|
|
@@ -1888,7 +1888,7 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
gettimeofday(&now, 0);
|
|
gettimeofday(&now, 0);
|
|
if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
|
|
if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
|
|
int time_left = calculate_interval(&zh->next_deadline, &now);
|
|
int time_left = calculate_interval(&zh->next_deadline, &now);
|
|
- int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
|
|
|
|
|
|
+ int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
|
|
(zh->recv_timeout / 10);
|
|
(zh->recv_timeout / 10);
|
|
if (time_left > max_exceed)
|
|
if (time_left > max_exceed)
|
|
LOG_WARN(("Exceeded deadline by %dms", time_left));
|
|
LOG_WARN(("Exceeded deadline by %dms", time_left));
|
|
@@ -1907,7 +1907,7 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
|
|
|
if (*fd == -1) {
|
|
if (*fd == -1) {
|
|
|
|
|
|
- /*
|
|
|
|
|
|
+ /*
|
|
* If we previously failed to connect to server pool (zh->delay == 1)
|
|
* If we previously failed to connect to server pool (zh->delay == 1)
|
|
* then we need delay our connection on this iteration 1/60 of the
|
|
* then we need delay our connection on this iteration 1/60 of the
|
|
* recv timeout before trying again so we don't spin.
|
|
* recv timeout before trying again so we don't spin.
|
|
@@ -1921,7 +1921,7 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
|
|
|
LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
|
|
LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
|
|
zh->hostname));
|
|
zh->hostname));
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
// No need to delay -- grab the next server and attempt connection
|
|
// No need to delay -- grab the next server and attempt connection
|
|
else {
|
|
else {
|
|
@@ -1944,7 +1944,7 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
|
|
LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
|
|
}
|
|
}
|
|
#ifdef WIN32
|
|
#ifdef WIN32
|
|
- ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
|
|
|
|
|
|
+ ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
|
|
#else
|
|
#else
|
|
fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
|
|
fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
|
|
#endif
|
|
#endif
|
|
@@ -2233,7 +2233,7 @@ static void process_sync_completion(
|
|
cptr->c.type, cptr->xid, sc->rc));
|
|
cptr->c.type, cptr->xid, sc->rc));
|
|
|
|
|
|
switch(cptr->c.type) {
|
|
switch(cptr->c.type) {
|
|
- case COMPLETION_DATA:
|
|
|
|
|
|
+ case COMPLETION_DATA:
|
|
if (sc->rc==0) {
|
|
if (sc->rc==0) {
|
|
struct GetDataResponse res;
|
|
struct GetDataResponse res;
|
|
int len;
|
|
int len;
|
|
@@ -2289,7 +2289,7 @@ static void process_sync_completion(
|
|
const char * client_path;
|
|
const char * client_path;
|
|
deserialize_CreateResponse(ia, "reply", &res);
|
|
deserialize_CreateResponse(ia, "reply", &res);
|
|
//ZOOKEEPER-1027
|
|
//ZOOKEEPER-1027
|
|
- client_path = sub_string(zh, res.path);
|
|
|
|
|
|
+ client_path = sub_string(zh, res.path);
|
|
len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
|
|
len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
|
|
len = sc->u.str.str_len;
|
|
len = sc->u.str.str_len;
|
|
}
|
|
}
|
|
@@ -2672,7 +2672,7 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
*sc = (struct sync_completion*)cptr->data;
|
|
*sc = (struct sync_completion*)cptr->data;
|
|
sc->rc = rc;
|
|
sc->rc = rc;
|
|
|
|
|
|
- process_sync_completion(cptr, sc, ia, zh);
|
|
|
|
|
|
+ process_sync_completion(cptr, sc, ia, zh);
|
|
|
|
|
|
notify_sync_completion(sc);
|
|
notify_sync_completion(sc);
|
|
free_buffer(bptr);
|
|
free_buffer(bptr);
|
|
@@ -2771,9 +2771,9 @@ static void destroy_completion_entry(completion_list_t* c){
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-static void queue_completion_nolock(completion_head_t *list,
|
|
|
|
|
|
+static void queue_completion_nolock(completion_head_t *list,
|
|
completion_list_t *c,
|
|
completion_list_t *c,
|
|
- int add_to_front)
|
|
|
|
|
|
+ int add_to_front)
|
|
{
|
|
{
|
|
c->next = 0;
|
|
c->next = 0;
|
|
/* appending a new entry to the back of the list */
|
|
/* appending a new entry to the back of the list */
|
|
@@ -2990,7 +2990,7 @@ static int isValidPath(const char* path, const int flags) {
|
|
* REQUEST INIT HELPERS
|
|
* REQUEST INIT HELPERS
|
|
*---------------------------------------------------------------------------*/
|
|
*---------------------------------------------------------------------------*/
|
|
/* Common Request init helper functions to reduce code duplication */
|
|
/* Common Request init helper functions to reduce code duplication */
|
|
-static int Request_path_init(zhandle_t *zh, int flags,
|
|
|
|
|
|
+static int Request_path_init(zhandle_t *zh, int flags,
|
|
char **path_out, const char *path)
|
|
char **path_out, const char *path)
|
|
{
|
|
{
|
|
assert(path_out);
|
|
assert(path_out);
|
|
@@ -3052,7 +3052,7 @@ int zoo_awget(zhandle_t *zh, const char *path,
|
|
rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
|
|
rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
|
|
- create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
|
|
|
|
|
|
+ create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
get_buffer_len(oa));
|
|
get_buffer_len(oa));
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
@@ -3067,6 +3067,87 @@ int zoo_awget(zhandle_t *zh, const char *path,
|
|
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
|
|
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
|
|
|
|
+ const void *data)
|
|
|
|
+{
|
|
|
|
+ return zoo_awgetconfig(zh,watch?zh->watcher:0,zh->context,dc,data);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
|
|
|
|
+ data_completion_t dc, const void *data)
|
|
|
|
+{
|
|
|
|
+ struct oarchive *oa;
|
|
|
|
+ char *path = ZOO_CONFIG_NODE;
|
|
|
|
+ char *server_path = ZOO_CONFIG_NODE;
|
|
|
|
+ struct RequestHeader h = { get_xid(), ZOO_GETDATA_OP };
|
|
|
|
+ struct GetDataRequest req = { (char*)server_path, watcher!=0 };
|
|
|
|
+ int rc;
|
|
|
|
+
|
|
|
|
+ if (zh==0 || !isValidPath(server_path, 0)) {
|
|
|
|
+ free_duplicate_path(server_path, path);
|
|
|
|
+ return ZBADARGUMENTS;
|
|
|
|
+ }
|
|
|
|
+ if (is_unrecoverable(zh)) {
|
|
|
|
+ free_duplicate_path(server_path, path);
|
|
|
|
+ return ZINVALIDSTATE;
|
|
|
|
+ }
|
|
|
|
+ oa=create_buffer_oarchive();
|
|
|
|
+ rc = serialize_RequestHeader(oa, "header", &h);
|
|
|
|
+ rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
|
|
|
|
+ enter_critical(zh);
|
|
|
|
+ rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
|
|
|
|
+ create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
|
|
|
|
+ rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
|
|
+ get_buffer_len(oa));
|
|
|
|
+ leave_critical(zh);
|
|
|
|
+ free_duplicate_path(server_path, path);
|
|
|
|
+ /* We queued the buffer, so don't free it */
|
|
|
|
+ close_buffer_oarchive(&oa, 0);
|
|
|
|
+
|
|
|
|
+ LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
|
|
|
|
+ zoo_get_current_server(zh)));
|
|
|
|
+ /* make a best (non-blocking) effort to send the requests asap */
|
|
|
|
+ adaptor_send_queue(zh, 0);
|
|
|
|
+ return (rc < 0)?ZMARSHALLINGERROR:ZOK;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
|
|
|
|
+ const char *members, int64_t version, data_completion_t dc, const void *data)
|
|
|
|
+{
|
|
|
|
+ struct oarchive *oa;
|
|
|
|
+ struct RequestHeader h = { get_xid(), ZOO_RECONFIG_OP };
|
|
|
|
+ struct ReconfigRequest req;
|
|
|
|
+ int rc = 0;
|
|
|
|
+
|
|
|
|
+ if (zh==0) {
|
|
|
|
+ return ZBADARGUMENTS;
|
|
|
|
+ }
|
|
|
|
+ if (is_unrecoverable(zh)) {
|
|
|
|
+ return ZINVALIDSTATE;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ oa=create_buffer_oarchive();
|
|
|
|
+ req.joiningServers = (char *)joining;
|
|
|
|
+ req.leavingServers = (char *)leaving;
|
|
|
|
+ req.newMembers = (char *)members;
|
|
|
|
+ req.curConfigId = version;
|
|
|
|
+ rc = serialize_RequestHeader(oa, "header", &h);
|
|
|
|
+ rc = rc < 0 ? rc : serialize_ReconfigRequest(oa, "req", &req);
|
|
|
|
+ enter_critical(zh);
|
|
|
|
+ rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data, NULL);
|
|
|
|
+ rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
|
|
+ get_buffer_len(oa));
|
|
|
|
+ leave_critical(zh);
|
|
|
|
+ /* We queued the buffer, so don't free it */
|
|
|
|
+ close_buffer_oarchive(&oa, 0);
|
|
|
|
+
|
|
|
|
+ LOG_DEBUG(("Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh)));
|
|
|
|
+ /* make a best (non-blocking) effort to send the requests asap */
|
|
|
|
+ adaptor_send_queue(zh, 0);
|
|
|
|
+
|
|
|
|
+ return (rc < 0)?ZMARSHALLINGERROR:ZOK;
|
|
|
|
+}
|
|
|
|
+
|
|
static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
|
|
static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
|
|
const char *path, const char *buffer, int buflen, int version)
|
|
const char *path, const char *buffer, int buflen, int version)
|
|
{
|
|
{
|
|
@@ -3223,7 +3304,7 @@ int zoo_acreate2(zhandle_t *zh, const char *path, const char *value,
|
|
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
|
|
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
|
|
}
|
|
}
|
|
|
|
|
|
-int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
|
|
|
|
|
|
+int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
|
|
const char *path, int version)
|
|
const char *path, int version)
|
|
{
|
|
{
|
|
int rc = Request_path_init(zh, 0, &req->path, path);
|
|
int rc = Request_path_init(zh, 0, &req->path, path);
|
|
@@ -3526,7 +3607,7 @@ static void op_result_stat_completion(int err, const struct Stat *stat, const vo
|
|
} else {
|
|
} else {
|
|
result->stat = NULL ;
|
|
result->stat = NULL ;
|
|
}
|
|
}
|
|
-}
|
|
|
|
|
|
+}
|
|
|
|
|
|
static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
|
|
static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
|
|
const char *path, int version)
|
|
const char *path, int version)
|
|
@@ -3565,16 +3646,16 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
case ZOO_CREATE_OP: {
|
|
case ZOO_CREATE_OP: {
|
|
struct CreateRequest req;
|
|
struct CreateRequest req;
|
|
|
|
|
|
- rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
|
|
|
|
- op->create_op.path, op->create_op.data,
|
|
|
|
- op->create_op.datalen, op->create_op.acl,
|
|
|
|
|
|
+ rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
|
|
|
|
+ op->create_op.path, op->create_op.data,
|
|
|
|
+ op->create_op.datalen, op->create_op.acl,
|
|
op->create_op.flags);
|
|
op->create_op.flags);
|
|
rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
|
|
result->value = op->create_op.buf;
|
|
result->value = op->create_op.buf;
|
|
result->valuelen = op->create_op.buflen;
|
|
result->valuelen = op->create_op.buflen;
|
|
|
|
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
|
|
|
|
|
|
+ entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
free_duplicate_path(req.path, op->create_op.path);
|
|
free_duplicate_path(req.path, op->create_op.path);
|
|
break;
|
|
break;
|
|
@@ -3586,7 +3667,7 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
|
|
|
|
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
|
|
|
|
|
|
+ entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
free_duplicate_path(req.path, op->delete_op.path);
|
|
free_duplicate_path(req.path, op->delete_op.path);
|
|
break;
|
|
break;
|
|
@@ -3595,13 +3676,13 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
case ZOO_SETDATA_OP: {
|
|
case ZOO_SETDATA_OP: {
|
|
struct SetDataRequest req;
|
|
struct SetDataRequest req;
|
|
rc = rc < 0 ? rc : SetDataRequest_init(zh, &req,
|
|
rc = rc < 0 ? rc : SetDataRequest_init(zh, &req,
|
|
- op->set_op.path, op->set_op.data,
|
|
|
|
|
|
+ op->set_op.path, op->set_op.data,
|
|
op->set_op.datalen, op->set_op.version);
|
|
op->set_op.datalen, op->set_op.version);
|
|
rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
|
|
result->stat = op->set_op.stat;
|
|
result->stat = op->set_op.stat;
|
|
|
|
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
|
|
|
|
|
|
+ entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
free_duplicate_path(req.path, op->set_op.path);
|
|
free_duplicate_path(req.path, op->set_op.path);
|
|
break;
|
|
break;
|
|
@@ -3614,15 +3695,15 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
|
|
rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
|
|
|
|
|
|
enter_critical(zh);
|
|
enter_critical(zh);
|
|
- entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
|
|
|
|
|
|
+ entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
|
|
leave_critical(zh);
|
|
leave_critical(zh);
|
|
free_duplicate_path(req.path, op->check_op.path);
|
|
free_duplicate_path(req.path, op->check_op.path);
|
|
break;
|
|
break;
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
default:
|
|
default:
|
|
LOG_ERROR(("Unimplemented sub-op type=%d in multi-op", op->type));
|
|
LOG_ERROR(("Unimplemented sub-op type=%d in multi-op", op->type));
|
|
- return ZUNIMPLEMENTED;
|
|
|
|
|
|
+ return ZUNIMPLEMENTED;
|
|
}
|
|
}
|
|
|
|
|
|
queue_completion(&clist, entry, 0);
|
|
queue_completion(&clist, entry, 0);
|
|
@@ -3649,7 +3730,7 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
}
|
|
}
|
|
|
|
|
|
void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
|
|
void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
|
|
- int valuelen, const struct ACL_vector *acl, int flags,
|
|
|
|
|
|
+ int valuelen, const struct ACL_vector *acl, int flags,
|
|
char *path_buffer, int path_buffer_len)
|
|
char *path_buffer, int path_buffer_len)
|
|
{
|
|
{
|
|
assert(op);
|
|
assert(op);
|
|
@@ -3686,7 +3767,7 @@ void zoo_delete_op_init(zoo_op_t *op, const char *path, int version)
|
|
op->delete_op.version = version;
|
|
op->delete_op.version = version;
|
|
}
|
|
}
|
|
|
|
|
|
-void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
|
|
|
|
|
|
+void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
|
|
int buflen, int version, struct Stat *stat)
|
|
int buflen, int version, struct Stat *stat)
|
|
{
|
|
{
|
|
assert(op);
|
|
assert(op);
|
|
@@ -3732,7 +3813,7 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
int rc= ZOK;
|
|
int rc= ZOK;
|
|
struct timeval started;
|
|
struct timeval started;
|
|
#ifdef WIN32
|
|
#ifdef WIN32
|
|
- fd_set pollSet;
|
|
|
|
|
|
+ fd_set pollSet;
|
|
struct timeval wait;
|
|
struct timeval wait;
|
|
#endif
|
|
#endif
|
|
gettimeofday(&started,0);
|
|
gettimeofday(&started,0);
|
|
@@ -3757,7 +3838,7 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
FD_ZERO(&pollSet);
|
|
FD_ZERO(&pollSet);
|
|
FD_SET(zh->fd, &pollSet);
|
|
FD_SET(zh->fd, &pollSet);
|
|
// Poll the socket
|
|
// Poll the socket
|
|
- rc = select((int)(zh->fd)+1, NULL, &pollSet, NULL, &wait);
|
|
|
|
|
|
+ rc = select((int)(zh->fd)+1, NULL, &pollSet, NULL, &wait);
|
|
#else
|
|
#else
|
|
struct pollfd fds;
|
|
struct pollfd fds;
|
|
fds.fd = zh->fd;
|
|
fds.fd = zh->fd;
|
|
@@ -3843,6 +3924,10 @@ const char* zerror(int c)
|
|
return "(not error) no server responses to process";
|
|
return "(not error) no server responses to process";
|
|
case ZSESSIONMOVED:
|
|
case ZSESSIONMOVED:
|
|
return "session moved to another server, so operation is ignored";
|
|
return "session moved to another server, so operation is ignored";
|
|
|
|
+ case ZNEWCONFIGNOQUORUM:
|
|
|
|
+ return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
|
|
|
|
+ case ZRECONFIGINPROGRESS:
|
|
|
|
+ return "Another reconfiguration is in progress -- concurrent reconfigs not supported (yet)";
|
|
}
|
|
}
|
|
if (c > 0) {
|
|
if (c > 0) {
|
|
return strerror(c);
|
|
return strerror(c);
|
|
@@ -3862,7 +3947,7 @@ int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert,
|
|
return ZINVALIDSTATE;
|
|
return ZINVALIDSTATE;
|
|
|
|
|
|
// [ZOOKEEPER-800] zoo_add_auth should return ZINVALIDSTATE if
|
|
// [ZOOKEEPER-800] zoo_add_auth should return ZINVALIDSTATE if
|
|
- // the connection is closed.
|
|
|
|
|
|
+ // the connection is closed.
|
|
if (zoo_state(zh) == 0) {
|
|
if (zoo_state(zh) == 0) {
|
|
return ZINVALIDSTATE;
|
|
return ZINVALIDSTATE;
|
|
}
|
|
}
|
|
@@ -3919,12 +4004,12 @@ static const char* format_endpoint_info(const struct sockaddr_storage* ep)
|
|
}
|
|
}
|
|
#endif
|
|
#endif
|
|
#ifdef WIN32
|
|
#ifdef WIN32
|
|
- addrstring = inet_ntoa (*(struct in_addr*)inaddr);
|
|
|
|
|
|
+ addrstring = inet_ntoa (*(struct in_addr*)inaddr);
|
|
sprintf(buf,"%s:%d",addrstring,ntohs(port));
|
|
sprintf(buf,"%s:%d",addrstring,ntohs(port));
|
|
#else
|
|
#else
|
|
inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
|
|
inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
|
|
sprintf(buf,"%s:%d",addrstr,ntohs(port));
|
|
sprintf(buf,"%s:%d",addrstr,ntohs(port));
|
|
-#endif
|
|
|
|
|
|
+#endif
|
|
return buf;
|
|
return buf;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -4056,6 +4141,48 @@ int zoo_wget(zhandle_t *zh, const char *path,
|
|
return rc;
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+int zoo_getconfig(zhandle_t *zh, int watch, char *buffer,
|
|
|
|
+ int* buffer_len, struct Stat *stat)
|
|
|
|
+{
|
|
|
|
+ return zoo_wget(zh,ZOO_CONFIG_NODE,watch?zh->watcher:0,zh->context, buffer,buffer_len,stat);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+int zoo_wgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
|
|
|
|
+ char *buffer, int* buffer_len, struct Stat *stat)
|
|
|
|
+{
|
|
|
|
+ return zoo_wget(zh, ZOO_CONFIG_NODE, watcher, watcherCtx, buffer, buffer_len, stat);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+int zoo_reconfig(zhandle_t *zh, const char *joining, const char *leaving,
|
|
|
|
+ const char *members, int64_t version, char *buffer, int* buffer_len,
|
|
|
|
+ struct Stat *stat)
|
|
|
|
+{
|
|
|
|
+ struct sync_completion *sc;
|
|
|
|
+ int rc=0;
|
|
|
|
+
|
|
|
|
+ if(buffer_len==NULL)
|
|
|
|
+ return ZBADARGUMENTS;
|
|
|
|
+ if((sc=alloc_sync_completion())==NULL)
|
|
|
|
+ return ZSYSTEMERROR;
|
|
|
|
+
|
|
|
|
+ sc->u.data.buffer = buffer;
|
|
|
|
+ sc->u.data.buff_len = *buffer_len;
|
|
|
|
+ rc=zoo_areconfig(zh, joining, leaving, members, version, SYNCHRONOUS_MARKER, sc);
|
|
|
|
+
|
|
|
|
+ if(rc==ZOK){
|
|
|
|
+ wait_sync_completion(sc);
|
|
|
|
+ rc = sc->rc;
|
|
|
|
+ if (rc == 0) {
|
|
|
|
+ if(stat)
|
|
|
|
+ *stat = sc->u.data.stat;
|
|
|
|
+ *buffer_len = sc->u.data.buff_len;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ free_sync_completion(sc);
|
|
|
|
+ return rc;
|
|
|
|
+}
|
|
|
|
+
|
|
int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
|
|
int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
|
|
int version)
|
|
int version)
|
|
{
|
|
{
|