|
@@ -79,9 +79,15 @@
|
|
|
#include <pwd.h>
|
|
|
#endif
|
|
|
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+#include <openssl/ssl.h>
|
|
|
+#include <openssl/err.h>
|
|
|
+#endif
|
|
|
+
|
|
|
#ifdef __MACH__ // OS X
|
|
|
#include <mach/clock.h>
|
|
|
#include <mach/mach.h>
|
|
|
+#include <netinet/tcp.h>
|
|
|
#endif
|
|
|
|
|
|
#ifdef WIN32
|
|
@@ -124,6 +130,7 @@ const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
|
|
|
const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
|
|
|
const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
|
|
|
const int ZOO_READONLY_STATE = READONLY_STATE_DEF;
|
|
|
+const int ZOO_SSL_CONNECTING_STATE = SSL_CONNECTING_STATE_DEF;
|
|
|
const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
|
|
|
|
|
|
static __attribute__ ((unused)) const char* state2String(int state){
|
|
@@ -132,6 +139,8 @@ static __attribute__ ((unused)) const char* state2String(int state){
|
|
|
return "ZOO_CLOSED_STATE";
|
|
|
case CONNECTING_STATE_DEF:
|
|
|
return "ZOO_CONNECTING_STATE";
|
|
|
+ case SSL_CONNECTING_STATE_DEF:
|
|
|
+ return "ZOO_SSL_CONNECTING_STATE";
|
|
|
case ASSOCIATING_STATE_DEF:
|
|
|
return "ZOO_ASSOCIATING_STATE";
|
|
|
case CONNECTED_STATE_DEF:
|
|
@@ -273,6 +282,10 @@ static struct sockaddr_storage *addr_rw_server = 0;
|
|
|
|
|
|
static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
|
|
|
static int isValidPath(const char* path, const int mode);
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+static int init_ssl_for_handler(zhandle_t *zh);
|
|
|
+static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error);
|
|
|
+#endif
|
|
|
|
|
|
static int aremove_watches(
|
|
|
zhandle_t *zh, const char *path, ZooWatcherType wtype,
|
|
@@ -323,9 +336,22 @@ static void abort_singlethreaded(zhandle_t *zh)
|
|
|
abort();
|
|
|
}
|
|
|
|
|
|
-static sendsize_t zookeeper_send(socket_t s, const void* buf, size_t len)
|
|
|
+static ssize_t zookeeper_send(zsock_t *fd, const void* buf, size_t len)
|
|
|
+{
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+ if (fd->ssl_sock)
|
|
|
+ return (ssize_t)SSL_write(fd->ssl_sock, buf, (int)len);
|
|
|
+#endif
|
|
|
+ return send(fd->sock, buf, len, SEND_FLAGS);
|
|
|
+}
|
|
|
+
|
|
|
+static ssize_t zookeeper_recv(zsock_t *fd, void *buf, size_t len, int flags)
|
|
|
{
|
|
|
- return send(s, buf, len, SEND_FLAGS);
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+ if (fd->ssl_sock)
|
|
|
+ return (ssize_t)SSL_read(fd->ssl_sock, buf, (int)len);
|
|
|
+#endif
|
|
|
+ return recv(fd->sock, buf, len, flags);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -360,7 +386,7 @@ void get_system_time(struct timeval *tv)
|
|
|
// Default to gettimeofday in case of failure.
|
|
|
ret = gettimeofday(tv, NULL);
|
|
|
}
|
|
|
-#elif CLOCK_MONOTONIC_RAW
|
|
|
+#elif defined CLOCK_MONOTONIC_RAW
|
|
|
// On Linux, CLOCK_MONOTONIC is affected by ntp slew but CLOCK_MONOTONIC_RAW
|
|
|
// is not. We want the non-slewed (constant rate) CLOCK_MONOTONIC_RAW if it
|
|
|
// is available.
|
|
@@ -555,6 +581,22 @@ zk_hashtable *child_result_checker(zhandle_t *zh, int rc)
|
|
|
return rc==ZOK ? zh->active_child_watchers : 0;
|
|
|
}
|
|
|
|
|
|
+void close_zsock(zsock_t *fd)
|
|
|
+{
|
|
|
+ if (fd->sock != -1) {
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+ if (fd->ssl_sock) {
|
|
|
+ SSL_free(fd->ssl_sock);
|
|
|
+ fd->ssl_sock = NULL;
|
|
|
+ SSL_CTX_free(fd->ssl_ctx);
|
|
|
+ fd->ssl_ctx = NULL;
|
|
|
+ }
|
|
|
+#endif
|
|
|
+ close(fd->sock);
|
|
|
+ fd->sock = -1;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
/**
|
|
|
* Frees and closes everything associated with a handle,
|
|
|
* including the handle itself.
|
|
@@ -573,9 +615,8 @@ static void destroy(zhandle_t *zh)
|
|
|
free(zh->hostname);
|
|
|
zh->hostname = NULL;
|
|
|
}
|
|
|
- if (zh->fd != -1) {
|
|
|
- close(zh->fd);
|
|
|
- zh->fd = -1;
|
|
|
+ if (zh->fd->sock != -1) {
|
|
|
+ close_zsock(zh->fd);
|
|
|
memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
|
|
|
zh->state = 0;
|
|
|
}
|
|
@@ -585,7 +626,13 @@ static void destroy(zhandle_t *zh)
|
|
|
free(zh->chroot);
|
|
|
zh->chroot = NULL;
|
|
|
}
|
|
|
-
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+ if (zh->fd->cert) {
|
|
|
+ free(zh->fd->cert->certstr);
|
|
|
+ free(zh->fd->cert);
|
|
|
+ zh->fd->cert = NULL;
|
|
|
+ }
|
|
|
+#endif
|
|
|
free_auth_info(&zh->auth_h);
|
|
|
destroy_zk_hashtable(zh->active_node_watchers);
|
|
|
destroy_zk_hashtable(zh->active_exist_watchers);
|
|
@@ -1030,10 +1077,9 @@ int update_addrs(zhandle_t *zh)
|
|
|
// If we need to do a reconfig and we're currently connected to a server,
|
|
|
// then force close that connection so on next interest() call we'll make a
|
|
|
// new connection
|
|
|
- if (zh->reconfig == 1 && zh->fd != -1)
|
|
|
+ if (zh->reconfig == 1 && zh->fd->sock != -1)
|
|
|
{
|
|
|
- close(zh->fd);
|
|
|
- zh->fd = -1;
|
|
|
+ close_zsock(zh->fd);
|
|
|
zh->state = ZOO_NOTCONNECTED_STATE;
|
|
|
}
|
|
|
|
|
@@ -1080,7 +1126,7 @@ struct sockaddr* zookeeper_get_connected_host(zhandle_t *zh,
|
|
|
if (zh->state!=ZOO_CONNECTED_STATE) {
|
|
|
return NULL;
|
|
|
}
|
|
|
- if (getpeername(zh->fd, addr, addr_len)==-1) {
|
|
|
+ if (getpeername(zh->fd->sock, addr, addr_len)==-1) {
|
|
|
return NULL;
|
|
|
}
|
|
|
return addr;
|
|
@@ -1151,7 +1197,7 @@ static void log_env(zhandle_t *zh) {
|
|
|
*/
|
|
|
static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags,
|
|
|
- log_callback_fn log_callback)
|
|
|
+ log_callback_fn log_callback, zcert_t *cert)
|
|
|
{
|
|
|
int errnosave = 0;
|
|
|
zhandle_t *zh = NULL;
|
|
@@ -1170,6 +1216,13 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
|
log_env(zh);
|
|
|
}
|
|
|
|
|
|
+ zh->fd = calloc(1, sizeof(zsock_t));
|
|
|
+ zh->fd->sock = -1;
|
|
|
+ if (cert) {
|
|
|
+ zh->fd->cert = calloc(1, sizeof(zcert_t));
|
|
|
+ memcpy(zh->fd->cert, cert, sizeof(zcert_t));
|
|
|
+ }
|
|
|
+
|
|
|
#ifdef _WIN32
|
|
|
if (Win32WSAStartup()){
|
|
|
LOG_ERROR(LOGCALLBACK(zh), "Error initializing ws2_32.dll");
|
|
@@ -1188,7 +1241,6 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
|
flags);
|
|
|
|
|
|
zh->hostname = NULL;
|
|
|
- zh->fd = -1;
|
|
|
zh->state = ZOO_NOTCONNECTED_STATE;
|
|
|
zh->context = context;
|
|
|
zh->recv_timeout = recv_timeout;
|
|
@@ -1264,6 +1316,7 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
|
abort:
|
|
|
errnosave=errno;
|
|
|
destroy(zh);
|
|
|
+ free(zh->fd);
|
|
|
free(zh);
|
|
|
errno=errnosave;
|
|
|
return 0;
|
|
@@ -1272,16 +1325,30 @@ abort:
|
|
|
zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
|
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags)
|
|
|
{
|
|
|
- return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL);
|
|
|
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, NULL);
|
|
|
}
|
|
|
|
|
|
zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
|
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags,
|
|
|
log_callback_fn log_callback)
|
|
|
{
|
|
|
- return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback);
|
|
|
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL);
|
|
|
}
|
|
|
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+zhandle_t *zookeeper_init_ssl(const char *host, const char *cert, watcher_fn watcher,
|
|
|
+ int recv_timeout, const clientid_t *clientid, void *context, int flags)
|
|
|
+{
|
|
|
+ zcert_t zcert;
|
|
|
+ zcert.certstr = strdup(cert);
|
|
|
+ zcert.ca = strtok(strdup(cert), ",");
|
|
|
+ zcert.cert = strtok(NULL, ",");
|
|
|
+ zcert.key = strtok(NULL, ",");
|
|
|
+ zcert.passwd = strtok(NULL, ",");
|
|
|
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, &zcert);
|
|
|
+}
|
|
|
+#endif
|
|
|
+
|
|
|
/**
|
|
|
* Set a new list of zk servers to connect to. Disconnect will occur if
|
|
|
* current connection endpoint is not in the list.
|
|
@@ -1568,7 +1635,7 @@ static __attribute__ ((unused)) int get_queue_len(buffer_head_t *list)
|
|
|
* 0 if send would block while sending the buffer (or a send was incomplete),
|
|
|
* 1 if success
|
|
|
*/
|
|
|
-static int send_buffer(socket_t fd, buffer_list_t *buff)
|
|
|
+static int send_buffer(zhandle_t *zh, buffer_list_t *buff)
|
|
|
{
|
|
|
int len = buff->len;
|
|
|
int off = buff->curr_offset;
|
|
@@ -1578,7 +1645,7 @@ static int send_buffer(socket_t fd, buffer_list_t *buff)
|
|
|
/* we need to send the length at the beginning */
|
|
|
int nlen = htonl(len);
|
|
|
char *b = (char*)&nlen;
|
|
|
- rc = zookeeper_send(fd, b + off, sizeof(nlen) - off);
|
|
|
+ rc = zookeeper_send(zh->fd, b + off, sizeof(nlen) - off);
|
|
|
if (rc == -1) {
|
|
|
#ifdef _WIN32
|
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
@@ -1597,7 +1664,7 @@ static int send_buffer(socket_t fd, buffer_list_t *buff)
|
|
|
if (off >= 4) {
|
|
|
/* want off to now represent the offset into the buffer */
|
|
|
off -= sizeof(buff->len);
|
|
|
- rc = zookeeper_send(fd, buff->buffer + off, len - off);
|
|
|
+ rc = zookeeper_send(zh->fd, buff->buffer + off, len - off);
|
|
|
if (rc == -1) {
|
|
|
#ifdef _WIN32
|
|
|
if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
@@ -1626,7 +1693,7 @@ static int recv_buffer(zhandle_t *zh, buffer_list_t *buff)
|
|
|
/* if buffer is less than 4, we are reading in the length */
|
|
|
if (off < 4) {
|
|
|
char *buffer = (char*)&(buff->len);
|
|
|
- rc = recv(zh->fd, buffer+off, sizeof(int)-off, 0);
|
|
|
+ rc = zookeeper_recv(zh->fd, buffer+off, sizeof(int)-off, 0);
|
|
|
switch (rc) {
|
|
|
case 0:
|
|
|
errno = EHOSTDOWN;
|
|
@@ -1652,7 +1719,7 @@ static int recv_buffer(zhandle_t *zh, buffer_list_t *buff)
|
|
|
/* want off to now represent the offset into the buffer */
|
|
|
off -= sizeof(buff->len);
|
|
|
|
|
|
- rc = recv(zh->fd, buff->buffer+off, buff->len-off, 0);
|
|
|
+ rc = zookeeper_recv(zh->fd, buff->buffer+off, buff->len-off, 0);
|
|
|
|
|
|
/* dirty hack to make new client work against old server
|
|
|
* old server sends 40 bytes to finish connection handshake,
|
|
@@ -1772,7 +1839,7 @@ static int is_connected(zhandle_t* zh)
|
|
|
|
|
|
static void cleanup(zhandle_t *zh,int rc)
|
|
|
{
|
|
|
- close(zh->fd);
|
|
|
+ close_zsock(zh->fd);
|
|
|
if (is_unrecoverable(zh)) {
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
|
|
|
state2String(zh->state));
|
|
@@ -1782,7 +1849,6 @@ static void cleanup(zhandle_t *zh,int rc)
|
|
|
PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
|
|
|
}
|
|
|
cleanup_bufs(zh,1,rc);
|
|
|
- zh->fd = -1;
|
|
|
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Previous connection=%s delay=%d", zoo_get_current_server(zh), zh->delay);
|
|
|
|
|
@@ -2038,6 +2104,11 @@ static int prime_connection(zhandle_t *zh)
|
|
|
int len = sizeof(buffer_req);
|
|
|
int hlen = 0;
|
|
|
struct connect_req req;
|
|
|
+
|
|
|
+ if (zh->state == ZOO_SSL_CONNECTING_STATE) {
|
|
|
+ // The SSL connection is yet to happen.
|
|
|
+ return ZOK;
|
|
|
+ }
|
|
|
req.protocolVersion = 0;
|
|
|
req.sessionId = zh->seen_rw_server_before ? zh->client_id.client_id : 0;
|
|
|
req.passwd_len = sizeof(req.passwd);
|
|
@@ -2120,7 +2191,7 @@ const int MIN_RW_TIMEOUT = 200;
|
|
|
static int ping_rw_server(zhandle_t* zh)
|
|
|
{
|
|
|
char buf[10];
|
|
|
- socket_t sock;
|
|
|
+ zsock_t fd;
|
|
|
int rc;
|
|
|
sendsize_t ssize;
|
|
|
int sock_flags;
|
|
@@ -2132,27 +2203,41 @@ static int ping_rw_server(zhandle_t* zh)
|
|
|
#else
|
|
|
sock_flags = SOCK_STREAM;
|
|
|
#endif
|
|
|
- sock = socket(zh->addr_rw_server.ss_family, sock_flags, 0);
|
|
|
- if (sock < 0) {
|
|
|
+ fd.sock = socket(zh->addr_rw_server.ss_family, sock_flags, 0);
|
|
|
+ if (fd.sock < 0) {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- zookeeper_set_sock_nodelay(zh, sock);
|
|
|
- zookeeper_set_sock_timeout(zh, sock, 1);
|
|
|
+ zookeeper_set_sock_nodelay(zh, fd.sock);
|
|
|
+ zookeeper_set_sock_timeout(zh, fd.sock, 1);
|
|
|
|
|
|
- rc = zookeeper_connect(zh, &zh->addr_rw_server, sock);
|
|
|
+ rc = zookeeper_connect(zh, &zh->addr_rw_server, fd.sock);
|
|
|
if (rc < 0) {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- ssize = zookeeper_send(sock, "isro", 4);
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+ fd.ssl_sock = NULL;
|
|
|
+ fd.ssl_ctx = NULL;
|
|
|
+
|
|
|
+ if (zh->fd->cert != NULL) {
|
|
|
+ fd.cert = zh->fd->cert;
|
|
|
+ rc = init_ssl_for_socket(&fd, zh, 0);
|
|
|
+ if (rc != ZOK) {
|
|
|
+ rc = 0;
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
+ }
|
|
|
+#endif
|
|
|
+
|
|
|
+ ssize = zookeeper_send(&fd, "isro", 4);
|
|
|
if (ssize < 0) {
|
|
|
rc = 0;
|
|
|
goto out;
|
|
|
}
|
|
|
|
|
|
memset(buf, 0, sizeof(buf));
|
|
|
- rc = recv(sock, buf, sizeof(buf), 0);
|
|
|
+ rc = zookeeper_recv(&fd, buf, sizeof(buf), 0);
|
|
|
if (rc < 0) {
|
|
|
rc = 0;
|
|
|
goto out;
|
|
@@ -2161,7 +2246,7 @@ static int ping_rw_server(zhandle_t* zh)
|
|
|
rc = strcmp("rw", buf) == 0;
|
|
|
|
|
|
out:
|
|
|
- close(sock);
|
|
|
+ close_zsock(&fd);
|
|
|
addr_rw_server = rc ? &zh->addr_rw_server : 0;
|
|
|
return rc;
|
|
|
}
|
|
@@ -2292,7 +2377,7 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
return api_epilog(zh, rc);
|
|
|
}
|
|
|
|
|
|
- *fd = zh->fd;
|
|
|
+ *fd = zh->fd->sock;
|
|
|
*interest = 0;
|
|
|
tv->tv_sec = 0;
|
|
|
tv->tv_usec = 0;
|
|
@@ -2322,8 +2407,8 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
// No need to delay -- grab the next server and attempt connection
|
|
|
zoo_cycle_next_server(zh);
|
|
|
}
|
|
|
- zh->fd = socket(zh->addr_cur.ss_family, sock_flags, 0);
|
|
|
- if (zh->fd < 0) {
|
|
|
+ zh->fd->sock = socket(zh->addr_cur.ss_family, sock_flags, 0);
|
|
|
+ if (zh->fd->sock < 0) {
|
|
|
rc = handle_socket_error_msg(zh,
|
|
|
__LINE__,
|
|
|
ZSYSTEMERROR,
|
|
@@ -2331,17 +2416,21 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
return api_epilog(zh, rc);
|
|
|
}
|
|
|
|
|
|
- zookeeper_set_sock_nodelay(zh, zh->fd);
|
|
|
- zookeeper_set_sock_noblock(zh, zh->fd);
|
|
|
+ zookeeper_set_sock_nodelay(zh, zh->fd->sock);
|
|
|
+ zookeeper_set_sock_noblock(zh, zh->fd->sock);
|
|
|
|
|
|
- rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd);
|
|
|
+ rc = zookeeper_connect(zh, &zh->addr_cur, zh->fd->sock);
|
|
|
|
|
|
if (rc == -1) {
|
|
|
/* we are handling the non-blocking connect according to
|
|
|
* the description in section 16.3 "Non-blocking connect"
|
|
|
* in UNIX Network Programming vol 1, 3rd edition */
|
|
|
if (errno == EWOULDBLOCK || errno == EINPROGRESS) {
|
|
|
- zh->state = ZOO_CONNECTING_STATE;
|
|
|
+ // For SSL, we first go to ZOO_SSL_CONNECTING_STATE
|
|
|
+ if (zh->fd->cert != NULL)
|
|
|
+ zh->state = ZOO_SSL_CONNECTING_STATE;
|
|
|
+ else
|
|
|
+ zh->state = ZOO_CONNECTING_STATE;
|
|
|
} else {
|
|
|
rc = handle_socket_error_msg(zh,
|
|
|
__LINE__,
|
|
@@ -2350,6 +2439,14 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
return api_epilog(zh, rc);
|
|
|
}
|
|
|
} else {
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+ if (zh->fd->cert != NULL) {
|
|
|
+ // We do SSL_connect() here
|
|
|
+ if (init_ssl_for_handler(zh) != ZOK) {
|
|
|
+ return ZSSLCONNECTIONERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+#endif
|
|
|
rc = prime_connection(zh);
|
|
|
if (rc != 0) {
|
|
|
return api_epilog(zh,rc);
|
|
@@ -2361,7 +2458,7 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
}
|
|
|
*tv = get_timeval(zh->recv_timeout/3);
|
|
|
}
|
|
|
- *fd = zh->fd;
|
|
|
+ *fd = zh->fd->sock;
|
|
|
zh->last_recv = now;
|
|
|
zh->last_send = now;
|
|
|
zh->last_ping = now;
|
|
@@ -2369,13 +2466,13 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
zh->ping_rw_timeout = MIN_RW_TIMEOUT;
|
|
|
}
|
|
|
|
|
|
- if (zh->fd != -1) {
|
|
|
+ if (zh->fd->sock != -1) {
|
|
|
int idle_recv = calculate_interval(&zh->last_recv, &now);
|
|
|
int idle_send = calculate_interval(&zh->last_send, &now);
|
|
|
int recv_to = zh->recv_timeout*2/3 - idle_recv;
|
|
|
int send_to = zh->recv_timeout/3;
|
|
|
// have we exceeded the receive timeout threshold?
|
|
|
- if (recv_to <= 0) {
|
|
|
+ if (recv_to <= 0 && zh->state != ZOO_SSL_CONNECTING_STATE) {
|
|
|
// We gotta cut our losses and connect to someone else
|
|
|
#ifdef _WIN32
|
|
|
errno = WSAETIMEDOUT;
|
|
@@ -2444,21 +2541,186 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
|
/* we are interested in a write if we are connected and have something
|
|
|
* to send, or we are waiting for a connect to finish. */
|
|
|
if ((zh->to_send.head && is_connected(zh))
|
|
|
- || zh->state == ZOO_CONNECTING_STATE) {
|
|
|
+ || zh->state == ZOO_CONNECTING_STATE
|
|
|
+ || zh->state == ZOO_SSL_CONNECTING_STATE) {
|
|
|
*interest |= ZOOKEEPER_WRITE;
|
|
|
}
|
|
|
}
|
|
|
return api_epilog(zh,ZOK);
|
|
|
}
|
|
|
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+
|
|
|
+/*
|
|
|
+ * use this function, if you want to init SSL for the socket currently registered in the zookeeper handler
|
|
|
+ */
|
|
|
+static int init_ssl_for_handler(zhandle_t *zh)
|
|
|
+{
|
|
|
+ int rc = init_ssl_for_socket(zh->fd, zh, 1);
|
|
|
+ if (rc == ZOK) {
|
|
|
+ // (SUCCESS) Now mark the ZOO_CONNECTING_STATE so that
|
|
|
+ // prime_connection() happen.
|
|
|
+ // prime_connection() only happens in ZOO_CONNECTING_STATE
|
|
|
+ zh->state = ZOO_CONNECTING_STATE;
|
|
|
+ }
|
|
|
+ return rc;
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ * use this function, if you want to init SSL for a socket, pointing to a different server address than the one
|
|
|
+ * currently registered in the zookeeper handler (e.g. ping other servers when you are connected to a read-only one)
|
|
|
+ */
|
|
|
+static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error) {
|
|
|
+
|
|
|
+ SSL_CTX **ctx;
|
|
|
+
|
|
|
+ if (!fd->ssl_sock) {
|
|
|
+ const SSL_METHOD *method;
|
|
|
+
|
|
|
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
|
|
|
+ OpenSSL_add_all_algorithms();
|
|
|
+ ERR_load_BIO_strings();
|
|
|
+ ERR_load_crypto_strings();
|
|
|
+ SSL_load_error_strings();
|
|
|
+ SSL_library_init();
|
|
|
+ method = SSLv23_client_method();
|
|
|
+#else
|
|
|
+ OPENSSL_init_ssl(OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, NULL);
|
|
|
+ method = TLS_client_method();
|
|
|
+#endif
|
|
|
+ if (FIPS_mode() == 0) {
|
|
|
+ LOG_INFO(LOGCALLBACK(zh), "FIPS mode is OFF ");
|
|
|
+ } else {
|
|
|
+ LOG_INFO(LOGCALLBACK(zh), "FIPS mode is ON ");
|
|
|
+ }
|
|
|
+ fd->ssl_ctx = SSL_CTX_new(method);
|
|
|
+ ctx = &fd->ssl_ctx;
|
|
|
+
|
|
|
+ SSL_CTX_set_verify(*ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, 0);
|
|
|
+ /*SERVER CA FILE*/
|
|
|
+ if (SSL_CTX_load_verify_locations(*ctx, fd->cert->ca, 0) != 1) {
|
|
|
+ SSL_CTX_free(*ctx);
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "Failed to load CA file %s", fd->cert->ca);
|
|
|
+ errno = EINVAL;
|
|
|
+ return ZBADARGUMENTS;
|
|
|
+ }
|
|
|
+ if (SSL_CTX_set_default_verify_paths(*ctx) != 1) {
|
|
|
+ SSL_CTX_free(*ctx);
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "Call to SSL_CTX_set_default_verify_paths failed");
|
|
|
+ errno = EINVAL;
|
|
|
+ return ZBADARGUMENTS;
|
|
|
+ }
|
|
|
+ /*CLIENT CA FILE (With Certificate Chain)*/
|
|
|
+ if (SSL_CTX_use_certificate_chain_file(*ctx, fd->cert->cert) != 1) {
|
|
|
+ SSL_CTX_free(*ctx);
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "Failed to load client certificate chain from %s", fd->cert->cert);
|
|
|
+ errno = EINVAL;
|
|
|
+ return ZBADARGUMENTS;
|
|
|
+ }
|
|
|
+ /*CLIENT PRIVATE KEY*/
|
|
|
+ SSL_CTX_set_default_passwd_cb_userdata(*ctx, fd->cert->passwd);
|
|
|
+ if (SSL_CTX_use_PrivateKey_file(*ctx, fd->cert->key, SSL_FILETYPE_PEM) != 1) {
|
|
|
+ SSL_CTX_free(*ctx);
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "Failed to load client private key from %s", fd->cert->key);
|
|
|
+ errno = EINVAL;
|
|
|
+ return ZBADARGUMENTS;
|
|
|
+ }
|
|
|
+ /*CHECK*/
|
|
|
+ if (SSL_CTX_check_private_key(*ctx) != 1) {
|
|
|
+ SSL_CTX_free(*ctx);
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "SSL_CTX_check_private_key failed");
|
|
|
+ errno = EINVAL;
|
|
|
+ return ZBADARGUMENTS;
|
|
|
+ }
|
|
|
+ /*MULTIPLE HANDSHAKE*/
|
|
|
+ SSL_CTX_set_mode(*ctx, SSL_MODE_AUTO_RETRY);
|
|
|
+
|
|
|
+ fd->ssl_sock = SSL_new(*ctx);
|
|
|
+ if (fd->ssl_sock == NULL) {
|
|
|
+ if (fail_on_error) {
|
|
|
+ return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error creating ssl context");
|
|
|
+ } else {
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "error creating ssl context");
|
|
|
+ return ZSSLCONNECTIONERROR;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ SSL_set_fd(fd->ssl_sock, fd->sock);
|
|
|
+ }
|
|
|
+ while(1) {
|
|
|
+ int rc;
|
|
|
+ int sock = fd->sock;
|
|
|
+ struct timeval tv;
|
|
|
+ fd_set s_rfds, s_wfds;
|
|
|
+ tv.tv_sec = 1;
|
|
|
+ tv.tv_usec = 0;
|
|
|
+ FD_ZERO(&s_rfds);
|
|
|
+ FD_ZERO(&s_wfds);
|
|
|
+ rc = SSL_connect(fd->ssl_sock);
|
|
|
+ if (rc == 1) {
|
|
|
+ return ZOK;
|
|
|
+ } else {
|
|
|
+ rc = SSL_get_error(fd->ssl_sock, rc);
|
|
|
+ if (rc == SSL_ERROR_WANT_READ) {
|
|
|
+ FD_SET(sock, &s_rfds);
|
|
|
+ FD_CLR(sock, &s_wfds);
|
|
|
+ } else if (rc == SSL_ERROR_WANT_WRITE) {
|
|
|
+ FD_SET(sock, &s_wfds);
|
|
|
+ FD_CLR(sock, &s_rfds);
|
|
|
+ } else {
|
|
|
+ if (fail_on_error) {
|
|
|
+ return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error in ssl connect");
|
|
|
+ } else {
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "error in ssl connect");
|
|
|
+ return ZSSLCONNECTIONERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ rc = select(sock + 1, &s_rfds, &s_wfds, NULL, &tv);
|
|
|
+ if (rc == -1) {
|
|
|
+ if (fail_on_error) {
|
|
|
+ return handle_socket_error_msg(zh,__LINE__,ZSSLCONNECTIONERROR, "error in ssl connect (after select)");
|
|
|
+ } else {
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), "error in ssl connect (after select)");
|
|
|
+ return ZSSLCONNECTIONERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+#endif
|
|
|
+
|
|
|
static int check_events(zhandle_t *zh, int events)
|
|
|
{
|
|
|
- if (zh->fd == -1)
|
|
|
+ if (zh->fd->sock == -1)
|
|
|
return ZINVALIDSTATE;
|
|
|
+
|
|
|
+#ifdef HAVE_OPENSSL_H
|
|
|
+ if ((events&ZOOKEEPER_WRITE) && (zh->state == ZOO_SSL_CONNECTING_STATE) && zh->fd->cert != NULL) {
|
|
|
+ int rc, error;
|
|
|
+ socklen_t len = sizeof(error);
|
|
|
+ rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
|
|
|
+ /* the description in section 16.4 "Non-blocking connect"
|
|
|
+ * in UNIX Network Programming vol 1, 3rd edition, points out
|
|
|
+ * that sometimes the error is in errno and sometimes in error */
|
|
|
+ if (rc < 0 || error) {
|
|
|
+ if (rc == 0)
|
|
|
+ errno = error;
|
|
|
+ return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
|
|
|
+ "server refused to accept the client");
|
|
|
+ }
|
|
|
+ // We do SSL_connect() here
|
|
|
+ if (init_ssl_for_handler(zh) != ZOK) {
|
|
|
+ return ZSSLCONNECTIONERROR;
|
|
|
+ }
|
|
|
+ }
|
|
|
+#endif
|
|
|
+
|
|
|
if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
|
|
|
int rc, error;
|
|
|
socklen_t len = sizeof(error);
|
|
|
- rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
|
|
|
+ rc = getsockopt(zh->fd->sock, SOL_SOCKET, SO_ERROR, &error, &len);
|
|
|
/* the description in section 16.4 "Non-blocking connect"
|
|
|
* in UNIX Network Programming vol 1, 3rd edition, points out
|
|
|
* that sometimes the error is in errno and sometimes in error */
|
|
@@ -2475,6 +2737,7 @@ static int check_events(zhandle_t *zh, int events)
|
|
|
LOG_INFO(LOGCALLBACK(zh), "initiated connection to server %s", format_endpoint_info(&zh->addr_cur));
|
|
|
return ZOK;
|
|
|
}
|
|
|
+
|
|
|
if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
|
|
|
/* make the flush call non-blocking by specifying a 0 timeout */
|
|
|
int rc=flush_send_queue(zh,0);
|
|
@@ -2811,7 +3074,7 @@ static void isSocketReadable(zhandle_t* zh)
|
|
|
{
|
|
|
#ifndef _WIN32
|
|
|
struct pollfd fds;
|
|
|
- fds.fd = zh->fd;
|
|
|
+ fds.fd = zh->fd->sock;
|
|
|
fds.events = POLLIN;
|
|
|
if (poll(&fds,1,0)<=0) {
|
|
|
// socket not readable -- no more responses to process
|
|
@@ -3285,6 +3548,7 @@ int zookeeper_close(zhandle_t *zh)
|
|
|
finish:
|
|
|
destroy(zh);
|
|
|
adaptor_destroy(zh);
|
|
|
+ free(zh->fd);
|
|
|
free(zh);
|
|
|
#ifdef _WIN32
|
|
|
Win32WSACleanup();
|
|
@@ -4346,11 +4610,11 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
#ifdef _WIN32
|
|
|
wait = get_timeval(timeout-elapsed);
|
|
|
FD_ZERO(&pollSet);
|
|
|
- FD_SET(zh->fd, &pollSet);
|
|
|
+ FD_SET(zh->fd->sock, &pollSet);
|
|
|
// Poll the socket
|
|
|
- rc = select((int)(zh->fd)+1, NULL, &pollSet, NULL, &wait);
|
|
|
+ rc = select((int)(zh->fd->sock)+1, NULL, &pollSet, NULL, &wait);
|
|
|
#else
|
|
|
- fds.fd = zh->fd;
|
|
|
+ fds.fd = zh->fd->sock;
|
|
|
fds.events = POLLOUT;
|
|
|
fds.revents = 0;
|
|
|
rc = poll(&fds, 1, timeout-elapsed);
|
|
@@ -4362,7 +4626,7 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- rc = send_buffer(zh->fd, zh->to_send.head);
|
|
|
+ rc = send_buffer(zh, zh->to_send.head);
|
|
|
if(rc==0 && timeout==0){
|
|
|
/* send_buffer would block while sending this buffer */
|
|
|
rc = ZOK;
|