|
@@ -32,6 +32,10 @@
|
|
#include "zookeeper_log.h"
|
|
#include "zookeeper_log.h"
|
|
#include "zk_hashtable.h"
|
|
#include "zk_hashtable.h"
|
|
|
|
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+#include "zk_sasl.h"
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
|
|
+
|
|
#include <stdlib.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <string.h>
|
|
@@ -326,6 +330,17 @@ static void zookeeper_set_sock_noblock(zhandle_t *, socket_t);
|
|
static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
|
|
static void zookeeper_set_sock_timeout(zhandle_t *, socket_t, int);
|
|
static socket_t zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
|
|
static socket_t zookeeper_connect(zhandle_t *, struct sockaddr_storage *, socket_t);
|
|
|
|
|
|
|
|
+/*
|
|
|
|
+ * return 1 if zh has a SASL client performing authentication, 0 otherwise.
|
|
|
|
+ */
|
|
|
|
+static int is_sasl_auth_in_progress(zhandle_t* zh)
|
|
|
|
+{
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+ return zh->sasl_client && zh->sasl_client->state == ZOO_SASL_INTERMEDIATE;
|
|
|
|
+#else /* !HAVE_CYRUS_SASL_H */
|
|
|
|
+ return 0;
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
|
|
+}
|
|
|
|
|
|
/*
|
|
/*
|
|
* abort due to the use of a sync api in a singlethreaded environment
|
|
* abort due to the use of a sync api in a singlethreaded environment
|
|
@@ -639,6 +654,13 @@ static void destroy(zhandle_t *zh)
|
|
destroy_zk_hashtable(zh->active_child_watchers);
|
|
destroy_zk_hashtable(zh->active_child_watchers);
|
|
addrvec_free(&zh->addrs_old);
|
|
addrvec_free(&zh->addrs_old);
|
|
addrvec_free(&zh->addrs_new);
|
|
addrvec_free(&zh->addrs_new);
|
|
|
|
+
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+ if (zh->sasl_client) {
|
|
|
|
+ zoo_sasl_client_destroy(zh->sasl_client);
|
|
|
|
+ zh->sasl_client = NULL;
|
|
|
|
+ }
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
}
|
|
}
|
|
|
|
|
|
static void setup_random()
|
|
static void setup_random()
|
|
@@ -1197,7 +1219,7 @@ static void log_env(zhandle_t *zh) {
|
|
*/
|
|
*/
|
|
static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags,
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags,
|
|
- log_callback_fn log_callback, zcert_t *cert)
|
|
|
|
|
|
+ log_callback_fn log_callback, zcert_t *cert, void *sasl_params)
|
|
{
|
|
{
|
|
int errnosave = 0;
|
|
int errnosave = 0;
|
|
zhandle_t *zh = NULL;
|
|
zhandle_t *zh = NULL;
|
|
@@ -1308,6 +1330,16 @@ static zhandle_t *zookeeper_init_internal(const char *host, watcher_fn watcher,
|
|
zh->active_child_watchers=create_zk_hashtable();
|
|
zh->active_child_watchers=create_zk_hashtable();
|
|
zh->disable_reconnection_attempt = 0;
|
|
zh->disable_reconnection_attempt = 0;
|
|
|
|
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+ if (sasl_params) {
|
|
|
|
+ zh->sasl_client = zoo_sasl_client_create(
|
|
|
|
+ (zoo_sasl_params_t*)sasl_params);
|
|
|
|
+ if (!zh->sasl_client) {
|
|
|
|
+ goto abort;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
|
|
+
|
|
if (adaptor_init(zh) == -1) {
|
|
if (adaptor_init(zh) == -1) {
|
|
goto abort;
|
|
goto abort;
|
|
}
|
|
}
|
|
@@ -1325,14 +1357,14 @@ abort:
|
|
zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
|
|
zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags)
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags)
|
|
{
|
|
{
|
|
- return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, NULL);
|
|
|
|
|
|
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, NULL, NULL);
|
|
}
|
|
}
|
|
|
|
|
|
zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
|
|
zhandle_t *zookeeper_init2(const char *host, watcher_fn watcher,
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags,
|
|
int recv_timeout, const clientid_t *clientid, void *context, int flags,
|
|
log_callback_fn log_callback)
|
|
log_callback_fn log_callback)
|
|
{
|
|
{
|
|
- return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL);
|
|
|
|
|
|
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, NULL);
|
|
}
|
|
}
|
|
|
|
|
|
#ifdef HAVE_OPENSSL_H
|
|
#ifdef HAVE_OPENSSL_H
|
|
@@ -1345,10 +1377,19 @@ zhandle_t *zookeeper_init_ssl(const char *host, const char *cert, watcher_fn wat
|
|
zcert.cert = strtok(NULL, ",");
|
|
zcert.cert = strtok(NULL, ",");
|
|
zcert.key = strtok(NULL, ",");
|
|
zcert.key = strtok(NULL, ",");
|
|
zcert.passwd = strtok(NULL, ",");
|
|
zcert.passwd = strtok(NULL, ",");
|
|
- return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, &zcert);
|
|
|
|
|
|
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, NULL, &zcert, NULL);
|
|
}
|
|
}
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+zhandle_t *zookeeper_init_sasl(const char *host, watcher_fn watcher,
|
|
|
|
+ int recv_timeout, const clientid_t *clientid, void *context, int flags,
|
|
|
|
+ log_callback_fn log_callback, zoo_sasl_params_t *sasl_params)
|
|
|
|
+{
|
|
|
|
+ return zookeeper_init_internal(host, watcher, recv_timeout, clientid, context, flags, log_callback, NULL, sasl_params);
|
|
|
|
+}
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Set a new list of zk servers to connect to. Disconnect will occur if
|
|
* Set a new list of zk servers to connect to. Disconnect will occur if
|
|
* current connection endpoint is not in the list.
|
|
* current connection endpoint is not in the list.
|
|
@@ -2540,7 +2581,7 @@ int zookeeper_interest(zhandle_t *zh, socket_t *fd, int *interest,
|
|
*interest = ZOOKEEPER_READ;
|
|
*interest = ZOOKEEPER_READ;
|
|
/* we are interested in a write if we are connected and have something
|
|
/* we are interested in a write if we are connected and have something
|
|
* to send, or we are waiting for a connect to finish. */
|
|
* to send, or we are waiting for a connect to finish. */
|
|
- if ((zh->to_send.head && is_connected(zh))
|
|
|
|
|
|
+ if ((zh->to_send.head && (is_connected(zh) || is_sasl_auth_in_progress(zh)))
|
|
|| zh->state == ZOO_CONNECTING_STATE
|
|
|| zh->state == ZOO_CONNECTING_STATE
|
|
|| zh->state == ZOO_SSL_CONNECTING_STATE) {
|
|
|| zh->state == ZOO_SSL_CONNECTING_STATE) {
|
|
*interest |= ZOOKEEPER_WRITE;
|
|
*interest |= ZOOKEEPER_WRITE;
|
|
@@ -2691,6 +2732,91 @@ static int init_ssl_for_socket(zsock_t *fd, zhandle_t *zh, int fail_on_error) {
|
|
|
|
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
+/*
|
|
|
|
+ * the "bottom half" of the session establishment procedure, executed
|
|
|
|
+ * either after receiving the "prime response," or after SASL
|
|
|
|
+ * authentication is complete
|
|
|
|
+ */
|
|
|
|
+static void finalize_session_establishment(zhandle_t *zh) {
|
|
|
|
+ zh->state = zh->primer_storage.readOnly ?
|
|
|
|
+ ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
|
|
|
|
+ zh->reconfig = 0;
|
|
|
|
+ LOG_INFO(LOGCALLBACK(zh),
|
|
|
|
+ "session establishment complete on server %s, sessionId=%#llx, negotiated timeout=%d %s",
|
|
|
|
+ format_endpoint_info(&zh->addr_cur),
|
|
|
|
+ zh->client_id.client_id, zh->recv_timeout,
|
|
|
|
+ zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
|
|
|
|
+ /* we want the auth to be sent for, but since both call push to front
|
|
|
|
+ we need to call send_watch_set first */
|
|
|
|
+ send_set_watches(zh);
|
|
|
|
+ /* send the authentication packet now */
|
|
|
|
+ send_auth_info(zh);
|
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
|
|
|
|
+ zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
|
|
|
|
+ PROCESS_SESSION_EVENT(zh, zh->state);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+
|
|
|
|
+/*
|
|
|
|
+ * queue an encoded SASL request to ZooKeeper. The packet is added to
|
|
|
|
+ * the front of the queue.
|
|
|
|
+ *
|
|
|
|
+ * \param zh the ZooKeeper handle
|
|
|
|
+ * \param client_data the encoded SASL data, ready to send
|
|
|
|
+ * \param client_data_len the length of \c client_data
|
|
|
|
+ * \return ZOK on success, or ZMARSHALLINGERROR if something went wrong
|
|
|
|
+ */
|
|
|
|
+int queue_sasl_request(zhandle_t *zh, const char *client_data, int client_data_len)
|
|
|
|
+{
|
|
|
|
+ struct oarchive *oa;
|
|
|
|
+ int rc;
|
|
|
|
+
|
|
|
|
+ /* Java client use normal xid, too. */
|
|
|
|
+ struct RequestHeader h = { get_xid(), ZOO_SASL_OP };
|
|
|
|
+ struct GetSASLRequest req = { { client_data_len, client_data_len>0 ? (char *) client_data : "" } };
|
|
|
|
+
|
|
|
|
+ oa = create_buffer_oarchive();
|
|
|
|
+ rc = serialize_RequestHeader(oa, "header", &h);
|
|
|
|
+ rc = rc < 0 ? rc : serialize_GetSASLRequest(oa, "req", &req);
|
|
|
|
+ rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
|
|
+ get_buffer_len(oa));
|
|
|
|
+ close_buffer_oarchive(&oa, 0);
|
|
|
|
+
|
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh),
|
|
|
|
+ "SASL: Queued request len=%d rc=%d", client_data_len, rc);
|
|
|
|
+
|
|
|
|
+ return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/*
|
|
|
|
+ * decode an expected SASL response and perform the corresponding
|
|
|
|
+ * authentication step
|
|
|
|
+ */
|
|
|
|
+static int process_sasl_response(zhandle_t *zh, char *buffer, int len)
|
|
|
|
+{
|
|
|
|
+ struct iarchive *ia = create_buffer_iarchive(buffer, len);
|
|
|
|
+ struct ReplyHeader hdr;
|
|
|
|
+ struct SetSASLResponse res;
|
|
|
|
+ int rc;
|
|
|
|
+
|
|
|
|
+ rc = ia ? ZOK : ZSYSTEMERROR;
|
|
|
|
+ rc = rc < 0 ? rc : deserialize_ReplyHeader(ia, "hdr", &hdr);
|
|
|
|
+ rc = rc < 0 ? rc : deserialize_SetSASLResponse(ia, "reply", &res);
|
|
|
|
+ rc = rc < 0 ? rc : zoo_sasl_client_step(zh, res.token.buff, res.token.len);
|
|
|
|
+ deallocate_SetSASLResponse(&res);
|
|
|
|
+ if (ia) {
|
|
|
|
+ close_buffer_iarchive(&ia);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh),
|
|
|
|
+ "SASL: Processed response len=%d rc=%d", len, rc);
|
|
|
|
+
|
|
|
|
+ return rc;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
|
|
+
|
|
static int check_events(zhandle_t *zh, int events)
|
|
static int check_events(zhandle_t *zh, int events)
|
|
{
|
|
{
|
|
if (zh->fd->sock == -1)
|
|
if (zh->fd->sock == -1)
|
|
@@ -2759,7 +2885,24 @@ static int check_events(zhandle_t *zh, int events)
|
|
if (rc > 0) {
|
|
if (rc > 0) {
|
|
get_system_time(&zh->last_recv);
|
|
get_system_time(&zh->last_recv);
|
|
if (zh->input_buffer != &zh->primer_buffer) {
|
|
if (zh->input_buffer != &zh->primer_buffer) {
|
|
- queue_buffer(&zh->to_process, zh->input_buffer, 0);
|
|
|
|
|
|
+ if (is_connected(zh) || !is_sasl_auth_in_progress(zh)) {
|
|
|
|
+ queue_buffer(&zh->to_process, zh->input_buffer, 0);
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+ } else {
|
|
|
|
+ rc = process_sasl_response(zh, zh->input_buffer->buffer, zh->input_buffer->curr_offset);
|
|
|
|
+ free_buffer(zh->input_buffer);
|
|
|
|
+ if (rc < 0) {
|
|
|
|
+ zoo_sasl_mark_failed(zh);
|
|
|
|
+ return rc;
|
|
|
|
+ } else if (zh->sasl_client->state == ZOO_SASL_COMPLETE) {
|
|
|
|
+ /*
|
|
|
|
+ * SASL authentication just completed; send
|
|
|
|
+ * watches, auth. info, etc. now.
|
|
|
|
+ */
|
|
|
|
+ finalize_session_establishment(zh);
|
|
|
|
+ }
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
int64_t oldid, newid;
|
|
int64_t oldid, newid;
|
|
//deserialize
|
|
//deserialize
|
|
@@ -2780,22 +2923,28 @@ static int check_events(zhandle_t *zh, int events)
|
|
|
|
|
|
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
|
|
memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
|
|
sizeof(zh->client_id.passwd));
|
|
sizeof(zh->client_id.passwd));
|
|
- zh->state = zh->primer_storage.readOnly ?
|
|
|
|
- ZOO_READONLY_STATE : ZOO_CONNECTED_STATE;
|
|
|
|
- zh->reconfig = 0;
|
|
|
|
- LOG_INFO(LOGCALLBACK(zh),
|
|
|
|
- "session establishment complete on server %s, sessionId=%#llx, negotiated timeout=%d %s",
|
|
|
|
- format_endpoint_info(&zh->addr_cur),
|
|
|
|
- newid, zh->recv_timeout,
|
|
|
|
- zh->primer_storage.readOnly ? "(READ-ONLY mode)" : "");
|
|
|
|
- /* we want the auth to be sent for, but since both call push to front
|
|
|
|
- we need to call send_watch_set first */
|
|
|
|
- send_set_watches(zh);
|
|
|
|
- /* send the authentication packet now */
|
|
|
|
- send_auth_info(zh);
|
|
|
|
- LOG_DEBUG(LOGCALLBACK(zh), "Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE");
|
|
|
|
- zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
|
|
|
|
- PROCESS_SESSION_EVENT(zh, zh->state);
|
|
|
|
|
|
+
|
|
|
|
+#ifdef HAVE_CYRUS_SASL_H
|
|
|
|
+ if (zh->sasl_client) {
|
|
|
|
+ /*
|
|
|
|
+ * Start a SASL authentication session.
|
|
|
|
+ * Watches, auth. info, etc. will be sent
|
|
|
|
+ * after it completes.
|
|
|
|
+ */
|
|
|
|
+ rc = zoo_sasl_connect(zh);
|
|
|
|
+ rc = rc < 0 ? rc : zoo_sasl_client_start(zh);
|
|
|
|
+ if (rc < 0) {
|
|
|
|
+ zoo_sasl_mark_failed(zh);
|
|
|
|
+ return rc;
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ /* Can send watches, auth. info, etc. immediately. */
|
|
|
|
+ finalize_session_establishment(zh);
|
|
|
|
+ }
|
|
|
|
+#else /* HAVE_CYRUS_SASL_H */
|
|
|
|
+ /* Can send watches, auth. info, etc. immediately. */
|
|
|
|
+ finalize_session_establishment(zh);
|
|
|
|
+#endif /* HAVE_CYRUS_SASL_H */
|
|
}
|
|
}
|
|
}
|
|
}
|
|
zh->input_buffer = 0;
|
|
zh->input_buffer = 0;
|
|
@@ -4593,7 +4742,7 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
// we use a recursive lock instead and only dequeue the buffer if a send was
|
|
// we use a recursive lock instead and only dequeue the buffer if a send was
|
|
// successful
|
|
// successful
|
|
lock_buffer_list(&zh->to_send);
|
|
lock_buffer_list(&zh->to_send);
|
|
- while (zh->to_send.head != 0 && is_connected(zh)) {
|
|
|
|
|
|
+ while (zh->to_send.head != 0 && (is_connected(zh) || is_sasl_auth_in_progress(zh))) {
|
|
if(timeout!=0){
|
|
if(timeout!=0){
|
|
#ifndef _WIN32
|
|
#ifndef _WIN32
|
|
struct pollfd fds;
|
|
struct pollfd fds;
|