|
@@ -41,7 +41,7 @@
|
|
|
#include <stdarg.h>
|
|
|
#include <limits.h>
|
|
|
|
|
|
-#ifndef WIN32
|
|
|
+#ifndef _WIN32
|
|
|
#include <sys/time.h>
|
|
|
#include <sys/socket.h>
|
|
|
#include <poll.h>
|
|
@@ -76,6 +76,7 @@ const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
|
|
|
const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
|
|
|
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
|
|
|
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
|
|
|
+const int ZOO_READONLY_STATE = READONLY_STATE_DEF;
|
|
|
const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
|
|
|
|
|
|
static __attribute__ ((unused)) const char* state2String(int state){
|
|
@@ -88,6 +89,8 @@ static __attribute__ ((unused)) const char* state2String(int state){
|
|
|
return "ZOO_ASSOCIATING_STATE";
|
|
|
case CONNECTED_STATE_DEF:
|
|
|
return "ZOO_CONNECTED_STATE";
|
|
|
+ case READONLY_STATE_DEF:
|
|
|
+ return "ZOO_READONLY_STATE";
|
|
|
case EXPIRED_SESSION_STATE_DEF:
|
|
|
return "ZOO_EXPIRED_SESSION_STATE";
|
|
|
case AUTH_FAILED_STATE_DEF:
|
|
@@ -225,17 +228,28 @@ static __attribute__((unused)) void print_completion_queue(zhandle_t *zh);
|
|
|
static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
|
|
|
static int isValidPath(const char* path, const int flags);
|
|
|
|
|
|
-#ifdef _WINDOWS
|
|
|
-static int zookeeper_send(SOCKET s, const void* buf, int len)
|
|
|
+#ifdef _WIN32
|
|
|
+typedef SOCKET socket_t;
|
|
|
+typedef int sendsize_t;
|
|
|
+#define SEND_FLAGS 0
|
|
|
#else
|
|
|
-static ssize_t zookeeper_send(int s, const void* buf, size_t len)
|
|
|
+#ifdef __APPLE__
|
|
|
+#define MSG_NOSIGNAL SO_NOSIGPIPE
|
|
|
#endif
|
|
|
-{
|
|
|
-#ifdef __linux__
|
|
|
- return send(s, buf, len, MSG_NOSIGNAL);
|
|
|
-#else
|
|
|
- return send(s, buf, len, 0);
|
|
|
+typedef int socket_t;
|
|
|
+typedef ssize_t sendsize_t;
|
|
|
+#define SEND_FLAGS MSG_NOSIGNAL
|
|
|
#endif
|
|
|
+
|
|
|
+static void zookeeper_set_sock_nodelay(zhandle_t *, socket_t);
|
|
|
+static void zookeeper_set_sock_noblock(zhandle_t *, socket_t);
|
|
|
+static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
|
|
|
+static int zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
|
|
|
+
|
|
|
+
|
|
|
+static sendsize_t zookeeper_send(socket_t s, const void* buf, size_t len)
|
|
|
+{
|
|
|
+ return send(s, buf, len, SEND_FLAGS);
|
|
|
}
|
|
|
|
|
|
const void *zoo_get_context(zhandle_t *zh)
|
|
@@ -438,7 +452,7 @@ static void destroy(zhandle_t *zh)
|
|
|
|
|
|
static void setup_random()
|
|
|
{
|
|
|
-#ifndef WIN32 // TODO: better seed
|
|
|
+#ifndef _WIN32 // TODO: better seed
|
|
|
int seed;
|
|
|
int fd = open("/dev/urandom", O_RDONLY);
|
|
|
if (fd == -1) {
|
|
@@ -651,7 +665,7 @@ static int resolve_hosts(const zhandle_t *zh, const char *hosts_in, addrvec_t *a
|
|
|
#endif
|
|
|
if (rc != 0) {
|
|
|
errno = getaddrinfo_errno(rc);
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
LOG_ERROR(LOGCALLBACK(zh), "Win32 message: %s\n", gai_strerror(rc));
|
|
|
#elif __linux__ && __GNUC__
|
|
|
LOG_ERROR(LOGCALLBACK(zh), "getaddrinfo: %s\n", gai_strerror(rc));
|
|
@@ -990,7 +1004,7 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
|
zh->log_callback = log_callback;
|
|
|
log_env(zh);
|
|
|
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
if (Win32WSAStartup()){
|
|
|
LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
|
|
|
return 0;
|
|
@@ -1012,6 +1026,9 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
|
zh->state = ZOO_NOTCONNECTED_STATE;
|
|
|
zh->context = context;
|
|
|
zh->recv_timeout = recv_timeout;
|
|
|
+ zh->allow_read_only = flags & ZOO_READONLY;
|
|
|
+ // non-zero clientid implies we've seen r/w server already
|
|
|
+ zh->seen_rw_server_before = (clientid != 0 && clientid->client_id != 0);
|
|
|
init_auth_info(&zh->auth_h);
|
|
|
if (watcher) {
|
|
|
zh->watcher = watcher;
|
|
@@ -1054,6 +1071,7 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
|
if(update_addrs(zh) != 0) {
|
|
|
goto abort;
|
|
|
}
|
|
|
+
|
|
|
if (clientid) {
|
|
|
memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
|
|
|
} else {
|
|
@@ -1382,11 +1400,7 @@ static __attribute__ ((unused)) int get_queue_len(buffer_head_t *list)
|
|
|
* 0 if send would block while sending the buffer (or a send was incomplete),
|
|
|
* 1 if success
|
|
|
*/
|
|
|
-#ifdef WIN32
|
|
|
-static int send_buffer(SOCKET fd, buffer_list_t *buff)
|
|
|
-#else
|
|
|
-static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
-#endif
|
|
|
+static int send_buffer(socket_t fd, buffer_list_t *buff)
|
|
|
{
|
|
|
int len = buff->len;
|
|
|
int off = buff->curr_offset;
|
|
@@ -1398,10 +1412,10 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
char *b = (char*)&nlen;
|
|
|
rc = zookeeper_send(fd, b + off, sizeof(nlen) - off);
|
|
|
if (rc == -1) {
|
|
|
-#ifndef _WINDOWS
|
|
|
- if (errno != EAGAIN) {
|
|
|
-#else
|
|
|
+#ifdef _WIN32
|
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
|
+#else
|
|
|
+ if (errno != EAGAIN) {
|
|
|
#endif
|
|
|
return -1;
|
|
|
} else {
|
|
@@ -1417,10 +1431,10 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
off -= sizeof(buff->len);
|
|
|
rc = zookeeper_send(fd, buff->buffer + off, len - off);
|
|
|
if (rc == -1) {
|
|
|
-#ifndef _WINDOWS
|
|
|
- if (errno != EAGAIN) {
|
|
|
-#else
|
|
|
+#ifdef _WIN32
|
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
|
+#else
|
|
|
+ if (errno != EAGAIN) {
|
|
|
#endif
|
|
|
return -1;
|
|
|
}
|
|
@@ -1436,29 +1450,23 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
* 0 if recv would block,
|
|
|
* 1 if success
|
|
|
*/
|
|
|
-#ifdef WIN32
|
|
|
-static int recv_buffer(SOCKET fd, buffer_list_t *buff)
|
|
|
-#else
|
|
|
-static int recv_buffer(int fd, buffer_list_t *buff)
|
|
|
-#endif
|
|
|
+static int recv_buffer(zhandle_t *zh, buffer_list_t *buff)
|
|
|
{
|
|
|
int off = buff->curr_offset;
|
|
|
int rc = 0;
|
|
|
- //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
|
|
|
|
|
|
/* if buffer is less than 4, we are reading in the length */
|
|
|
if (off < 4) {
|
|
|
char *buffer = (char*)&(buff->len);
|
|
|
- rc = recv(fd, buffer+off, sizeof(int)-off, 0);
|
|
|
- //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
|
|
|
- switch(rc) {
|
|
|
+ rc = recv(zh->fd, buffer+off, sizeof(int)-off, 0);
|
|
|
+ switch (rc) {
|
|
|
case 0:
|
|
|
errno = EHOSTDOWN;
|
|
|
case -1:
|
|
|
-#ifndef _WINDOWS
|
|
|
- if (errno == EAGAIN) {
|
|
|
-#else
|
|
|
+#ifdef _WIN32
|
|
|
if (WSAGetLastError() == WSAEWOULDBLOCK) {
|
|
|
+#else
|
|
|
+ if (errno == EAGAIN) {
|
|
|
#endif
|
|
|
return 0;
|
|
|
}
|
|
@@ -1476,15 +1484,21 @@ static int recv_buffer(int fd, buffer_list_t *buff)
|
|
|
/* want off to now represent the offset into the buffer */
|
|
|
off -= sizeof(buff->len);
|
|
|
|
|
|
- rc = recv(fd, buff->buffer+off, buff->len-off, 0);
|
|
|
+ rc = recv(zh->fd, buff->buffer+off, buff->len-off, 0);
|
|
|
+
|
|
|
+ /* dirty hack to make new client work against old server
|
|
|
+ * old server sends 40 bytes to finish connection handshake,
|
|
|
+ * while we're expecting 41 (1 byte for read-only mode data) */
|
|
|
+ if (buff == &zh->primer_buffer && rc == buff->len - 1) ++rc;
|
|
|
+
|
|
|
switch(rc) {
|
|
|
case 0:
|
|
|
errno = EHOSTDOWN;
|
|
|
case -1:
|
|
|
-#ifndef _WINDOWS
|
|
|
- if (errno == EAGAIN) {
|
|
|
-#else
|
|
|
+#ifdef _WIN32
|
|
|
if (WSAGetLastError() == WSAEWOULDBLOCK) {
|
|
|
+#else
|
|
|
+ if (errno == EAGAIN) {
|
|
|
#endif
|
|
|
break;
|
|
|
}
|
|
@@ -1578,6 +1592,13 @@ static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/* return 1 if zh's state is ZOO_CONNECTED_STATE or ZOO_READONLY_STATE,
|
|
|
+ * 0 otherwise */
|
|
|
+static int is_connected(zhandle_t* zh)
|
|
|
+{
|
|
|
+ return (zh->state==ZOO_CONNECTED_STATE || zh->state==ZOO_READONLY_STATE);
|
|
|
+}
|
|
|
+
|
|
|
static void handle_error(zhandle_t *zh,int rc)
|
|
|
{
|
|
|
close(zh->fd);
|
|
@@ -1585,7 +1606,7 @@ static void handle_error(zhandle_t *zh,int rc)
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
|
|
|
state2String(zh->state));
|
|
|
PROCESS_SESSION_EVENT(zh, zh->state);
|
|
|
- } else if (zh->state == ZOO_CONNECTED_STATE) {
|
|
|
+ } else if (is_connected(zh)) {
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE");
|
|
|
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
|
|
|
}
|
|
@@ -1793,36 +1814,46 @@ static int serialize_prime_connect(struct connect_req *req, char* buffer){
|
|
|
offset = offset + sizeof(req->passwd_len);
|
|
|
|
|
|
memcpy(buffer + offset, req->passwd, sizeof(req->passwd));
|
|
|
+ offset = offset + sizeof(req->passwd);
|
|
|
+
|
|
|
+ memcpy(buffer + offset, &req->readOnly, sizeof(req->readOnly));
|
|
|
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- static int deserialize_prime_response(struct prime_struct *req, char* buffer){
|
|
|
+static int deserialize_prime_response(struct prime_struct *resp, char* buffer)
|
|
|
+{
|
|
|
//this should be the order of deserialization
|
|
|
int offset = 0;
|
|
|
- memcpy(&req->len, buffer + offset, sizeof(req->len));
|
|
|
- offset = offset + sizeof(req->len);
|
|
|
+ memcpy(&resp->len, buffer + offset, sizeof(resp->len));
|
|
|
+ offset = offset + sizeof(resp->len);
|
|
|
+
|
|
|
+ resp->len = ntohl(resp->len);
|
|
|
+ memcpy(&resp->protocolVersion,
|
|
|
+ buffer + offset,
|
|
|
+ sizeof(resp->protocolVersion));
|
|
|
+ offset = offset + sizeof(resp->protocolVersion);
|
|
|
+
|
|
|
+ resp->protocolVersion = ntohl(resp->protocolVersion);
|
|
|
+ memcpy(&resp->timeOut, buffer + offset, sizeof(resp->timeOut));
|
|
|
+ offset = offset + sizeof(resp->timeOut);
|
|
|
|
|
|
- req->len = ntohl(req->len);
|
|
|
- memcpy(&req->protocolVersion, buffer + offset, sizeof(req->protocolVersion));
|
|
|
- offset = offset + sizeof(req->protocolVersion);
|
|
|
+ resp->timeOut = ntohl(resp->timeOut);
|
|
|
+ memcpy(&resp->sessionId, buffer + offset, sizeof(resp->sessionId));
|
|
|
+ offset = offset + sizeof(resp->sessionId);
|
|
|
|
|
|
- req->protocolVersion = ntohl(req->protocolVersion);
|
|
|
- memcpy(&req->timeOut, buffer + offset, sizeof(req->timeOut));
|
|
|
- offset = offset + sizeof(req->timeOut);
|
|
|
+ resp->sessionId = htonll(resp->sessionId);
|
|
|
+ memcpy(&resp->passwd_len, buffer + offset, sizeof(resp->passwd_len));
|
|
|
+ offset = offset + sizeof(resp->passwd_len);
|
|
|
|
|
|
- req->timeOut = ntohl(req->timeOut);
|
|
|
- memcpy(&req->sessionId, buffer + offset, sizeof(req->sessionId));
|
|
|
- offset = offset + sizeof(req->sessionId);
|
|
|
+ resp->passwd_len = ntohl(resp->passwd_len);
|
|
|
+ memcpy(resp->passwd, buffer + offset, sizeof(resp->passwd));
|
|
|
+ offset = offset + sizeof(resp->passwd);
|
|
|
|
|
|
- req->sessionId = htonll(req->sessionId);
|
|
|
- memcpy(&req->passwd_len, buffer + offset, sizeof(req->passwd_len));
|
|
|
- offset = offset + sizeof(req->passwd_len);
|
|
|
+ memcpy(&resp->readOnly, buffer + offset, sizeof(resp->readOnly));
|
|
|
|
|
|
- req->passwd_len = ntohl(req->passwd_len);
|
|
|
- memcpy(req->passwd, buffer + offset, sizeof(req->passwd));
|
|
|
return 0;
|
|
|
- }
|
|
|
+}
|
|
|
|
|
|
static int prime_connection(zhandle_t *zh)
|
|
|
{
|
|
@@ -1833,11 +1864,12 @@ static int prime_connection(zhandle_t *zh)
|
|
|
int hlen = 0;
|
|
|
struct connect_req req;
|
|
|
req.protocolVersion = 0;
|
|
|
- req.sessionId = zh->client_id.client_id;
|
|
|
+ req.sessionId = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
|
|
|
req.passwd_len = sizeof(req.passwd);
|
|
|
memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
|
|
|
req.timeOut = zh->recv_timeout;
|
|
|
req.lastZxidSeen = zh->last_zxid;
|
|
|
+ req.readOnly = zh->allow_read_only;
|
|
|
hlen = htonl(len);
|
|
|
/* We are running fast and loose here, but this string should fit in the initial buffer! */
|
|
|
rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
|
|
@@ -1850,6 +1882,8 @@ static int prime_connection(zhandle_t *zh)
|
|
|
zh->state = ZOO_ASSOCIATING_STATE;
|
|
|
|
|
|
zh->input_buffer = &zh->primer_buffer;
|
|
|
+ memset(zh->input_buffer->buffer, 0, zh->input_buffer->len);
|
|
|
+
|
|
|
/* This seems a bit weird to to set the offset to 4, but we already have a
|
|
|
* length, so we skip reading the length (and allocating the buffer) by
|
|
|
* saying that we are already at offset 4 */
|
|
@@ -1905,16 +1939,128 @@ static struct timeval get_timeval(int interval)
|
|
|
return rc<0 ? rc : adaptor_send_queue(zh, 0);
|
|
|
}
|
|
|
|
|
|
-#ifdef WIN32
|
|
|
-int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
|
|
|
- struct timeval *tv)
|
|
|
+/* upper bound of a timeout for seeking for r/w server when in read-only mode */
|
|
|
+const int MAX_RW_TIMEOUT = 60000;
|
|
|
+const int MIN_RW_TIMEOUT = 200;
|
|
|
+
|
|
|
+static int ping_rw_server(zhandle_t* zh)
|
|
|
+{
|
|
|
+ char buf[10];
|
|
|
+ socket_t sock;
|
|
|
+ int rc;
|
|
|
+ sendsize_t ssize;
|
|
|
+ struct sockaddr_storage addr;
|
|
|
+
|
|
|
+ addrvec_peek(&zh->addrs, &addr);
|
|
|
+
|
|
|
+ sock = socket(addr.ss_family, SOCK_STREAM, 0);
|
|
|
+ if (sock < 0) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ zookeeper_set_sock_nodelay(zh, sock);
|
|
|
+ zookeeper_set_sock_timeout(zh, sock, 1);
|
|
|
+
|
|
|
+ rc = zookeeper_connect(zh, &addr, sock);
|
|
|
+ if (rc < 0) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ ssize = zookeeper_send(sock, "isro", 4);
|
|
|
+ if (ssize < 0) {
|
|
|
+ rc = 0;
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
+
|
|
|
+ memset(buf, 0, sizeof(buf));
|
|
|
+ rc = recv(sock, buf, sizeof(buf), 0);
|
|
|
+ if (rc < 0) {
|
|
|
+ rc = 0;
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
+
|
|
|
+ rc = strcmp("rw", buf) == 0;
|
|
|
+
|
|
|
+out:
|
|
|
+ close(sock);
|
|
|
+ return rc;
|
|
|
+}
|
|
|
+
|
|
|
+static inline int min(int a, int b)
|
|
|
+{
|
|
|
+ return a < b ? a : b;
|
|
|
+}
|
|
|
+
|
|
|
+static void zookeeper_set_sock_noblock(zhandle_t *zh, socket_t sock)
|
|
|
{
|
|
|
+#ifdef _WIN32
|
|
|
ULONG nonblocking_flag = 1;
|
|
|
+
|
|
|
+ ioctlsocket(sock, FIONBIO, &nonblocking_flag);
|
|
|
#else
|
|
|
-int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
- struct timeval *tv)
|
|
|
+ fcntl(sock, F_SETFL, O_NONBLOCK|fcntl(sock, F_GETFL, 0));
|
|
|
+#endif
|
|
|
+}
|
|
|
+
|
|
|
+static void zookeeper_set_sock_timeout(zhandle_t *zh, socket_t s, int timeout)
|
|
|
{
|
|
|
+ struct timeval tv = { .tv_sec = timeout };
|
|
|
+
|
|
|
+ setsockopt(s, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(struct timeval));
|
|
|
+ setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval));
|
|
|
+}
|
|
|
+
|
|
|
+static void zookeeper_set_sock_nodelay(zhandle_t *zh, socket_t sock)
|
|
|
+{
|
|
|
+#ifdef _WIN32
|
|
|
+ char enable_tcp_nodelay = 1;
|
|
|
+#else
|
|
|
+ int enable_tcp_nodelay = 1;
|
|
|
+#endif
|
|
|
+ int rc;
|
|
|
+
|
|
|
+ rc = setsockopt(sock,
|
|
|
+ IPPROTO_TCP,
|
|
|
+ TCP_NODELAY,
|
|
|
+ &enable_tcp_nodelay,
|
|
|
+ sizeof(enable_tcp_nodelay));
|
|
|
+
|
|
|
+ if (rc) {
|
|
|
+ LOG_WARN(LOGCALLBACK(zh),
|
|
|
+ "Unable to set TCP_NODELAY, latency may be effected");
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static socket_t zookeeper_connect(zhandle_t *zh,
|
|
|
+ struct sockaddr_storage *addr,
|
|
|
+ socket_t fd)
|
|
|
+{
|
|
|
+ int rc;
|
|
|
+ int addr_len;
|
|
|
+
|
|
|
+#if defined(AF_INET6)
|
|
|
+ if (addr->ss_family == AF_INET6) {
|
|
|
+ addr_len = sizeof(struct sockaddr_in6);
|
|
|
+ } else {
|
|
|
+ addr_len = sizeof(struct sockaddr_in);
|
|
|
+ }
|
|
|
+#else
|
|
|
+ addr_len = sizeof(struct sockaddr_in);
|
|
|
#endif
|
|
|
+
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
|
|
|
+ rc = connect(fd, (struct sockaddr *)addr, addr_len);
|
|
|
+
|
|
|
+#ifdef _WIN32
|
|
|
+ get_errno();
|
|
|
+#endif
|
|
|
+
|
|
|
+ return rc;
|
|
|
+}
|
|
|
+
|
|
|
+int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
+ struct timeval *tv)
|
|
|
+{
|
|
|
int rc = 0;
|
|
|
struct timeval now;
|
|
|
if(zh==0 || fd==0 ||interest==0 || tv==0)
|
|
@@ -1942,7 +2088,6 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
tv->tv_usec = 0;
|
|
|
|
|
|
if (*fd == -1) {
|
|
|
-
|
|
|
/*
|
|
|
* If we previously failed to connect to server pool (zh->delay == 1)
|
|
|
* then we need delay our connection on this iteration 1/60 of the
|
|
@@ -1957,63 +2102,45 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
|
|
|
LOG_WARN(LOGCALLBACK(zh), "Delaying connection after exhaustively trying all servers [%s]",
|
|
|
zh->hostname);
|
|
|
- }
|
|
|
-
|
|
|
- // No need to delay -- grab the next server and attempt connection
|
|
|
- else {
|
|
|
- int ssoresult;
|
|
|
-
|
|
|
-#ifdef WIN32
|
|
|
- char enable_tcp_nodelay = 1;
|
|
|
-#else
|
|
|
- int enable_tcp_nodelay = 1;
|
|
|
-#endif
|
|
|
+ } else {
|
|
|
+ // No need to delay -- grab the next server and attempt connection
|
|
|
zoo_cycle_next_server(zh);
|
|
|
-
|
|
|
zh->fd = socket(zh->addr_cur.ss_family, SOCK_STREAM, 0);
|
|
|
if (zh->fd < 0) {
|
|
|
- return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
|
|
|
- ZSYSTEMERROR, "socket() call failed"));
|
|
|
- }
|
|
|
- ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
|
|
|
- if (ssoresult != 0) {
|
|
|
- LOG_WARN(LOGCALLBACK(zh), "Unable to set TCP_NODELAY, operation latency may be effected");
|
|
|
+ rc = handle_socket_error_msg(zh,
|
|
|
+ __LINE__,
|
|
|
+ ZSYSTEMERROR,
|
|
|
+ "socket() call failed");
|
|
|
+ return api_epilog(zh, rc);
|
|
|
}
|
|
|
-#ifdef WIN32
|
|
|
- ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
|
|
|
-#else
|
|
|
- fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
|
|
|
-#endif
|
|
|
-#if defined(AF_INET6)
|
|
|
- if (zh->addr_cur.ss_family == AF_INET6) {
|
|
|
- rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
|
|
|
- } else {
|
|
|
-#else
|
|
|
- LOG_DEBUG(LOGCALLBACK(zh), "[zk] connect()\n");
|
|
|
- {
|
|
|
-#endif
|
|
|
- rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
|
|
|
-#ifdef WIN32
|
|
|
- get_errno();
|
|
|
-#endif
|
|
|
- }
|
|
|
- if (rc == -1) {
|
|
|
|
|
|
+ zookeeper_set_sock_nodelay(zh, zh->fd);
|
|
|
+ zookeeper_set_sock_noblock(zh, zh->fd);
|
|
|
+
|
|
|
+ rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd);
|
|
|
+
|
|
|
+ if (rc == -1) {
|
|
|
/* we are handling the non-blocking connect according to
|
|
|
* the description in section 16.3 "Non-blocking connect"
|
|
|
* in UNIX Network Programming vol 1, 3rd edition */
|
|
|
- if (errno == EWOULDBLOCK || errno == EINPROGRESS)
|
|
|
+ if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
|
|
|
zh->state = ZOO_CONNECTING_STATE;
|
|
|
- else
|
|
|
- {
|
|
|
- return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
|
|
|
- ZCONNECTIONLOSS,"connect() call failed"));
|
|
|
+ } else {
|
|
|
+ rc = handle_socket_error_msg(zh,
|
|
|
+ __LINE__,
|
|
|
+ ZCONNECTIONLOSS,
|
|
|
+ "connect() call failed");
|
|
|
+ return api_epilog(zh, rc);
|
|
|
}
|
|
|
} else {
|
|
|
- if((rc=prime_connection(zh))!=0)
|
|
|
+ rc = prime_connection(zh);
|
|
|
+ if (rc != 0) {
|
|
|
return api_epilog(zh,rc);
|
|
|
+ }
|
|
|
|
|
|
- LOG_INFO(LOGCALLBACK(zh), "Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur));
|
|
|
+ LOG_INFO(LOGCALLBACK(zh),
|
|
|
+ "Initiated connection to server [%s]",
|
|
|
+ format_endpoint_info(&zh->addr_cur));
|
|
|
}
|
|
|
*tv = get_timeval(zh->recv_timeout/3);
|
|
|
}
|
|
@@ -2021,6 +2148,8 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
zh->last_recv = now;
|
|
|
zh->last_send = now;
|
|
|
zh->last_ping = now;
|
|
|
+ zh->last_ping_rw = now;
|
|
|
+ zh->ping_rw_timeout = MIN_RW_TIMEOUT;
|
|
|
}
|
|
|
|
|
|
if (zh->fd != -1) {
|
|
@@ -2031,7 +2160,7 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
// have we exceeded the receive timeout threshold?
|
|
|
if (recv_to <= 0) {
|
|
|
// We gotta cut our losses and connect to someone else
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
errno = WSAETIMEDOUT;
|
|
|
#else
|
|
|
errno = ETIMEDOUT;
|
|
@@ -2045,14 +2174,13 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
-recv_to));
|
|
|
|
|
|
}
|
|
|
+
|
|
|
// We only allow 1/3 of our timeout time to expire before sending
|
|
|
// a PING
|
|
|
- if (zh->state==ZOO_CONNECTED_STATE) {
|
|
|
+ if (is_connected(zh)) {
|
|
|
send_to = zh->recv_timeout/3 - idle_send;
|
|
|
if (send_to <= 0) {
|
|
|
if (zh->sent_requests.head == 0) {
|
|
|
-// LOG_DEBUG(LOGCALLBACK(zh), "Sending PING to %s (exceeded idle by %dms)",
|
|
|
-// zoo_get_current_server(zh),-send_to);
|
|
|
rc = send_ping(zh);
|
|
|
if (rc < 0) {
|
|
|
LOG_ERROR(LOGCALLBACK(zh), "failed to send PING request (zk retcode=%d)",rc);
|
|
@@ -2062,8 +2190,33 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
send_to = zh->recv_timeout/3;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // If we are in read-only mode, seek for read/write server
|
|
|
+ if (zh->state == ZOO_READONLY_STATE) {
|
|
|
+ int idle_ping_rw = calculate_interval(&zh->last_ping_rw, &now);
|
|
|
+ if (idle_ping_rw >= zh->ping_rw_timeout) {
|
|
|
+ zh->last_ping_rw = now;
|
|
|
+ idle_ping_rw = 0;
|
|
|
+ zh->ping_rw_timeout = min(zh->ping_rw_timeout * 2,
|
|
|
+ MAX_RW_TIMEOUT);
|
|
|
+ if (ping_rw_server(zh)) {
|
|
|
+ struct sockaddr_storage addr;
|
|
|
+ addrvec_peek(&zh->addrs, &addr);
|
|
|
+ zh->ping_rw_timeout = MIN_RW_TIMEOUT;
|
|
|
+ LOG_INFO(LOGCALLBACK(zh),
|
|
|
+ "r/w server found at %s",
|
|
|
+ format_endpoint_info(&addr));
|
|
|
+ handle_error(zh, ZRWSERVERFOUND);
|
|
|
+ } else {
|
|
|
+ addrvec_next(&zh->addrs, NULL);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ send_to = min(send_to, zh->ping_rw_timeout - idle_ping_rw);
|
|
|
+ }
|
|
|
+
|
|
|
// choose the lesser value as the timeout
|
|
|
- *tv = get_timeval(recv_to < send_to? recv_to:send_to);
|
|
|
+ *tv = get_timeval(min(recv_to, send_to));
|
|
|
+
|
|
|
zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
|
|
|
zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
|
|
|
if (zh->next_deadline.tv_usec > 1000000) {
|
|
@@ -2073,7 +2226,7 @@ int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
*interest = ZOOKEEPER_READ;
|
|
|
/* we are interested in a write if we are connected and have something
|
|
|
* to send, or we are waiting for a connect to finish. */
|
|
|
- if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
|
|
|
+ if ((zh->to_send.head && is_connected(zh))
|
|
|
|| zh->state == ZOO_CONNECTING_STATE) {
|
|
|
*interest |= ZOOKEEPER_WRITE;
|
|
|
}
|
|
@@ -2118,7 +2271,7 @@ static int check_events(zhandle_t *zh, int events)
|
|
|
zh->input_buffer = allocate_buffer(0,0);
|
|
|
}
|
|
|
|
|
|
- rc = recv_buffer(zh->fd, zh->input_buffer);
|
|
|
+ rc = recv_buffer(zh, zh->input_buffer);
|
|
|
if (rc < 0) {
|
|
|
return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
|
|
|
"failed while receiving a server response");
|
|
@@ -2128,12 +2281,13 @@ static int check_events(zhandle_t *zh, int events)
|
|
|
if (zh->input_buffer != &zh->primer_buffer) {
|
|
|
queue_buffer(&zh->to_process, zh->input_buffer, 0);
|
|
|
} else {
|
|
|
- int64_t oldid,newid;
|
|
|
+ int64_t oldid, newid;
|
|
|
//deserialize
|
|
|
deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
|
|
|
/* We are processing the primer_buffer, so we need to finish
|
|
|
* the connection handshake */
|
|
|
- oldid = zh->client_id.client_id;
|
|
|
+ oldid = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
|
|
|
+ zh->seen_rw_server_before |= !zh->primer_storage.readOnly;
|
|
|
newid = zh->primer_storage.sessionId;
|
|
|
if (oldid != 0 && oldid != newid) {
|
|
|
zh->state = ZOO_EXPIRED_SESSION_STATE;
|
|
@@ -2146,11 +2300,14 @@ static int check_events(zhandle_t *zh, int events)
|
|
|
|
|
|
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
|
|
|
sizeof(zh->client_id.passwd));
|
|
|
- zh->state = ZOO_CONNECTED_STATE;
|
|
|
+ zh->state = zh->primer_storage.readOnly ?
|
|
|
+ ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
|
|
|
zh->reconfig = 0;
|
|
|
- LOG_INFO(LOGCALLBACK(zh), "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
|
|
|
- format_endpoint_info(&zh->addr_cur),
|
|
|
- newid, zh->recv_timeout);
|
|
|
+ LOG_INFO(LOGCALLBACK(zh),
|
|
|
+ "session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d %s",
|
|
|
+ format_endpoint_info(&zh->addr_cur),
|
|
|
+ newid, zh->recv_timeout,
|
|
|
+ zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
|
|
|
/* we want the auth to be sent for, but since both call push to front
|
|
|
we need to call send_watch_set first */
|
|
|
send_set_watches(zh);
|
|
@@ -2158,7 +2315,7 @@ static int check_events(zhandle_t *zh, int events)
|
|
|
send_auth_info(zh);
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
|
|
|
zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
|
|
|
- PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
|
|
|
+ PROCESS_SESSION_EVENT(zh, zh->state);
|
|
|
}
|
|
|
}
|
|
|
zh->input_buffer = 0;
|
|
@@ -2555,7 +2712,7 @@ void process_completions(zhandle_t *zh)
|
|
|
|
|
|
static void isSocketReadable(zhandle_t* zh)
|
|
|
{
|
|
|
-#ifndef WIN32
|
|
|
+#ifndef _WIN32
|
|
|
struct pollfd fds;
|
|
|
fds.fd = zh->fd;
|
|
|
fds.events = POLLIN;
|
|
@@ -2724,7 +2881,9 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
|
if (process_async(zh->outstanding_sync)) {
|
|
|
process_completions(zh);
|
|
|
}
|
|
|
- return api_epilog(zh,ZOK);}
|
|
|
+
|
|
|
+ return api_epilog(zh, ZOK);
|
|
|
+}
|
|
|
|
|
|
int zoo_state(zhandle_t *zh)
|
|
|
{
|
|
@@ -3000,7 +3159,7 @@ int zookeeper_close(zhandle_t *zh)
|
|
|
}
|
|
|
/* No need to decrement the counter since we're just going to
|
|
|
* destroy the handle later. */
|
|
|
- if(zh->state==ZOO_CONNECTED_STATE){
|
|
|
+ if (is_connected(zh)){
|
|
|
struct oarchive *oa;
|
|
|
struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
|
|
|
LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to [%s]\n",
|
|
@@ -3029,7 +3188,7 @@ finish:
|
|
|
destroy(zh);
|
|
|
adaptor_destroy(zh);
|
|
|
free(zh);
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
Win32WSACleanup();
|
|
|
#endif
|
|
|
return rc;
|
|
@@ -3905,7 +4064,7 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
{
|
|
|
int rc= ZOK;
|
|
|
struct timeval started;
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
fd_set pollSet;
|
|
|
struct timeval wait;
|
|
|
#endif
|
|
@@ -3915,9 +4074,9 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
// we use a recursive lock instead and only dequeue the buffer if a send was
|
|
|
// successful
|
|
|
lock_buffer_list(&zh->to_send);
|
|
|
- while (zh->to_send.head != 0&& zh->state == ZOO_CONNECTED_STATE) {
|
|
|
+ while (zh->to_send.head != 0 && is_connected(zh)) {
|
|
|
if(timeout!=0){
|
|
|
-#ifndef WIN32
|
|
|
+#ifndef _WIN32
|
|
|
struct pollfd fds;
|
|
|
#endif
|
|
|
int elapsed;
|
|
@@ -3929,7 +4088,7 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
wait = get_timeval(timeout-elapsed);
|
|
|
FD_ZERO(&pollSet);
|
|
|
FD_SET(zh->fd, &pollSet);
|
|
@@ -4019,6 +4178,8 @@ const char* zerror(int c)
|
|
|
return "(not error) no server responses to process";
|
|
|
case ZSESSIONMOVED:
|
|
|
return "session moved to another server, so operation is ignored";
|
|
|
+ case ZNOTREADONLY:
|
|
|
+ return "state-changing request is passed to read-only server";
|
|
|
case ZNEWCONFIGNOQUORUM:
|
|
|
return "no quorum of new config is connected and up-to-date with the leader of last commmitted config - try invoking reconfiguration after new servers are connected and synced";
|
|
|
case ZRECONFIGINPROGRESS:
|
|
@@ -4069,7 +4230,7 @@ int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert,
|
|
|
add_last_auth(&zh->auth_h, authinfo);
|
|
|
zoo_unlock_auth(zh);
|
|
|
|
|
|
- if(zh->state == ZOO_CONNECTED_STATE || zh->state == ZOO_ASSOCIATING_STATE)
|
|
|
+ if (is_connected(zh) || zh->state == ZOO_ASSOCIATING_STATE)
|
|
|
return send_last_auth_info(zh);
|
|
|
|
|
|
return ZOK;
|
|
@@ -4080,7 +4241,7 @@ static const char* format_endpoint_info(const struct sockaddr_storage* ep)
|
|
|
static char buf[128] = { 0 };
|
|
|
char addrstr[128] = { 0 };
|
|
|
void *inaddr;
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
char * addrstring;
|
|
|
#endif
|
|
|
int port;
|
|
@@ -4098,7 +4259,7 @@ static const char* format_endpoint_info(const struct sockaddr_storage* ep)
|
|
|
#if defined(AF_INET6)
|
|
|
}
|
|
|
#endif
|
|
|
-#ifdef WIN32
|
|
|
+#ifdef _WIN32
|
|
|
addrstring = inet_ntoa (*(struct in_addr*)inaddr);
|
|
|
sprintf(buf,"%s:%d",addrstring,ntohs(port));
|
|
|
#else
|