|
@@ -35,6 +35,13 @@
|
|
|
#include <stdio.h>
|
|
|
#include <string.h>
|
|
|
#include <time.h>
|
|
|
+#include <errno.h>
|
|
|
+#include <fcntl.h>
|
|
|
+#include <assert.h>
|
|
|
+#include <stdarg.h>
|
|
|
+#include <limits.h>
|
|
|
+
|
|
|
+#ifndef WIN32
|
|
|
#include <sys/time.h>
|
|
|
#include <sys/socket.h>
|
|
|
#include <poll.h>
|
|
@@ -42,14 +49,9 @@
|
|
|
#include <netinet/tcp.h>
|
|
|
#include <arpa/inet.h>
|
|
|
#include <netdb.h>
|
|
|
-#include <errno.h>
|
|
|
#include <unistd.h>
|
|
|
-#include <fcntl.h>
|
|
|
-#include <assert.h>
|
|
|
-#include <stdarg.h>
|
|
|
-#include <limits.h>
|
|
|
-
|
|
|
#include "config.h"
|
|
|
+#endif
|
|
|
|
|
|
#ifdef HAVE_SYS_UTSNAME_H
|
|
|
#include <sys/utsname.h>
|
|
@@ -403,6 +405,7 @@ static void destroy(zhandle_t *zh)
|
|
|
|
|
|
static void setup_random()
|
|
|
{
|
|
|
+#ifndef WIN32 // TODO: better seed
|
|
|
int seed;
|
|
|
int fd = open("/dev/urandom", O_RDONLY);
|
|
|
if (fd == -1) {
|
|
@@ -413,6 +416,7 @@ static void setup_random()
|
|
|
close(fd);
|
|
|
}
|
|
|
srandom(seed);
|
|
|
+#endif
|
|
|
}
|
|
|
|
|
|
#ifndef __CYGWIN__
|
|
@@ -425,7 +429,9 @@ static void setup_random()
|
|
|
static int getaddrinfo_errno(int rc) {
|
|
|
switch(rc) {
|
|
|
case EAI_NONAME:
|
|
|
+#if EAI_NODATA != EAI_NONAME
|
|
|
case EAI_NODATA:
|
|
|
+#endif
|
|
|
return ENOENT;
|
|
|
case EAI_MEMORY:
|
|
|
return ENOMEM;
|
|
@@ -568,7 +574,11 @@ int getaddrs(zhandle_t *zh)
|
|
|
#endif
|
|
|
if (rc != 0) {
|
|
|
errno = getaddrinfo_errno(rc);
|
|
|
+#ifdef WIN32
|
|
|
+ LOG_ERROR(("Win32 message: %s\n", gai_strerror(rc)));
|
|
|
+#else
|
|
|
LOG_ERROR(("getaddrinfo: %s\n", strerror(errno)));
|
|
|
+#endif
|
|
|
rc=ZSYSTEMERROR;
|
|
|
goto fail;
|
|
|
}
|
|
@@ -739,7 +749,12 @@ zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
|
|
|
char *index_chroot = NULL;
|
|
|
|
|
|
log_env();
|
|
|
-
|
|
|
+#ifdef WIN32
|
|
|
+ if (Win32WSAStartup()){
|
|
|
+ LOG_ERROR(("Error initializing ws2_32.dll"));
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+#endif
|
|
|
LOG_INFO(("Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
|
|
|
" sessionId=%#llx sessionPasswd=%s context=%p flags=%d",
|
|
|
host,
|
|
@@ -983,7 +998,11 @@ static __attribute__ ((unused)) int get_queue_len(buffer_head_t *list)
|
|
|
* 0 if send would block while sending the buffer (or a send was incomplete),
|
|
|
* 1 if success
|
|
|
*/
|
|
|
+#ifdef WIN32
|
|
|
+static int send_buffer(SOCKET fd, buffer_list_t *buff)
|
|
|
+#else
|
|
|
static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
+#endif
|
|
|
{
|
|
|
int len = buff->len;
|
|
|
int off = buff->curr_offset;
|
|
@@ -995,7 +1014,11 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
char *b = (char*)&nlen;
|
|
|
rc = send(fd, b + off, sizeof(nlen) - off, 0);
|
|
|
if (rc == -1) {
|
|
|
+#ifndef _WINDOWS
|
|
|
if (errno != EAGAIN) {
|
|
|
+#else
|
|
|
+ if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
|
+#endif
|
|
|
return -1;
|
|
|
} else {
|
|
|
return 0;
|
|
@@ -1010,7 +1033,11 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
off -= sizeof(buff->len);
|
|
|
rc = send(fd, buff->buffer + off, len - off, 0);
|
|
|
if (rc == -1) {
|
|
|
+#ifndef _WINDOWS
|
|
|
if (errno != EAGAIN) {
|
|
|
+#else
|
|
|
+ if (WSAGetLastError() != WSAEWOULDBLOCK) {
|
|
|
+#endif
|
|
|
return -1;
|
|
|
}
|
|
|
} else {
|
|
@@ -1025,7 +1052,11 @@ static int send_buffer(int fd, buffer_list_t *buff)
|
|
|
* 0 if recv would block,
|
|
|
* 1 if success
|
|
|
*/
|
|
|
+#ifdef WIN32
|
|
|
+static int recv_buffer(SOCKET fd, buffer_list_t *buff)
|
|
|
+#else
|
|
|
static int recv_buffer(int fd, buffer_list_t *buff)
|
|
|
+#endif
|
|
|
{
|
|
|
int off = buff->curr_offset;
|
|
|
int rc = 0;
|
|
@@ -1040,7 +1071,11 @@ static int recv_buffer(int fd, buffer_list_t *buff)
|
|
|
case 0:
|
|
|
errno = EHOSTDOWN;
|
|
|
case -1:
|
|
|
+#ifndef _WINDOWS
|
|
|
if (errno == EAGAIN) {
|
|
|
+#else
|
|
|
+ if (WSAGetLastError() == WSAEWOULDBLOCK) {
|
|
|
+#endif
|
|
|
return 0;
|
|
|
}
|
|
|
return -1;
|
|
@@ -1062,7 +1097,11 @@ static int recv_buffer(int fd, buffer_list_t *buff)
|
|
|
case 0:
|
|
|
errno = EHOSTDOWN;
|
|
|
case -1:
|
|
|
+#ifndef _WINDOWS
|
|
|
if (errno == EAGAIN) {
|
|
|
+#else
|
|
|
+ if (WSAGetLastError() == WSAEWOULDBLOCK) {
|
|
|
+#endif
|
|
|
break;
|
|
|
}
|
|
|
return -1;
|
|
@@ -1236,7 +1275,7 @@ static void auth_completion_func(int rc, zhandle_t* zh)
|
|
|
|
|
|
static int send_info_packet(zhandle_t *zh, auth_info* auth) {
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = AUTH_XID, .type = ZOO_SETAUTH_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER(xid , AUTH_XID), STRUCT_INITIALIZER(type , ZOO_SETAUTH_OP)};
|
|
|
struct AuthPacket req;
|
|
|
int rc;
|
|
|
oa = create_buffer_oarchive();
|
|
@@ -1304,7 +1343,7 @@ static void free_key_list(char **list, int count)
|
|
|
static int send_set_watches(zhandle_t *zh)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = SET_WATCHES_XID, .type = ZOO_SETWATCHES_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)};
|
|
|
struct SetWatches req;
|
|
|
int rc;
|
|
|
|
|
@@ -1458,7 +1497,7 @@ static struct timeval get_timeval(int interval)
|
|
|
{
|
|
|
int rc;
|
|
|
struct oarchive *oa = create_buffer_oarchive();
|
|
|
- struct RequestHeader h = { .xid = PING_XID, .type = ZOO_PING_OP };
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER(xid ,PING_XID), STRUCT_INITIALIZER (type , ZOO_PING_OP) };
|
|
|
|
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
|
enter_critical(zh);
|
|
@@ -1471,9 +1510,17 @@ static struct timeval get_timeval(int interval)
|
|
|
return rc<0 ? rc : adaptor_send_queue(zh, 0);
|
|
|
}
|
|
|
|
|
|
- int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
+#ifdef WIN32
|
|
|
+int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
|
|
|
+ struct timeval *tv)
|
|
|
+{
|
|
|
+
|
|
|
+ ULONG nonblocking_flag = 1;
|
|
|
+#else
|
|
|
+int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
|
|
|
struct timeval *tv)
|
|
|
{
|
|
|
+#endif
|
|
|
struct timeval now;
|
|
|
if(zh==0 || fd==0 ||interest==0 || tv==0)
|
|
|
return ZBADARGUMENTS;
|
|
@@ -1496,23 +1543,31 @@ static struct timeval get_timeval(int interval)
|
|
|
zh->connect_index = 0;
|
|
|
}else {
|
|
|
int rc;
|
|
|
- int on = 1;
|
|
|
+ char on = 1;
|
|
|
|
|
|
zh->fd = socket(zh->addrs[zh->connect_index].ss_family, SOCK_STREAM, 0);
|
|
|
if (zh->fd < 0) {
|
|
|
return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
|
|
|
ZSYSTEMERROR, "socket() call failed"));
|
|
|
}
|
|
|
- setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(int));
|
|
|
+ setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(char));
|
|
|
+#ifdef WIN32
|
|
|
+ ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
|
|
|
+#else
|
|
|
fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
|
|
|
+#endif
|
|
|
#if defined(AF_INET6)
|
|
|
if (zh->addrs[zh->connect_index].ss_family == AF_INET6) {
|
|
|
rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in6));
|
|
|
} else {
|
|
|
#else
|
|
|
+ LOG_DEBUG(("[zk] connect()\n"));
|
|
|
{
|
|
|
#endif
|
|
|
rc = connect(zh->fd, (struct sockaddr*) &zh->addrs[zh->connect_index], sizeof(struct sockaddr_in));
|
|
|
+#ifdef WIN32
|
|
|
+ get_errno();
|
|
|
+#endif
|
|
|
}
|
|
|
if (rc == -1) {
|
|
|
/* we are handling the non-blocking connect according to
|
|
@@ -1545,7 +1600,11 @@ static struct timeval get_timeval(int interval)
|
|
|
// have we exceeded the receive timeout threshold?
|
|
|
if (recv_to <= 0) {
|
|
|
// We gotta cut our losses and connect to someone else
|
|
|
+#ifdef WIN32
|
|
|
+ errno = WSAETIMEDOUT;
|
|
|
+#else
|
|
|
errno = ETIMEDOUT;
|
|
|
+#endif
|
|
|
*fd=-1;
|
|
|
*interest=0;
|
|
|
*tv = get_timeval(0);
|
|
@@ -1864,8 +1923,8 @@ static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *
|
|
|
{
|
|
|
int rc = 0;
|
|
|
completion_head_t *clist = &cptr->c.clist;
|
|
|
+ struct MultiHeader mhdr = { STRUCT_INITIALIZER(type , 0), STRUCT_INITIALIZER(done , 0), STRUCT_INITIALIZER(err , 0) };
|
|
|
assert(clist);
|
|
|
- struct MultiHeader mhdr = { 0 };
|
|
|
deserialize_MultiHeader(ia, "multiheader", &mhdr);
|
|
|
while (!mhdr.done) {
|
|
|
completion_list_t *entry = dequeue_completion(clist);
|
|
@@ -2020,13 +2079,25 @@ void process_completions(zhandle_t *zh)
|
|
|
|
|
|
static void isSocketReadable(zhandle_t* zh)
|
|
|
{
|
|
|
+#ifndef WIN32
|
|
|
struct pollfd fds;
|
|
|
fds.fd = zh->fd;
|
|
|
fds.events = POLLIN;
|
|
|
if (poll(&fds,1,0)<=0) {
|
|
|
// socket not readable -- no more responses to process
|
|
|
zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
|
|
|
- }else{
|
|
|
+ }
|
|
|
+#else
|
|
|
+ fd_set rfds;
|
|
|
+ struct timeval waittime = {0, 0};
|
|
|
+ FD_ZERO(&rfds);
|
|
|
+ FD_SET( zh->fd , &rfds);
|
|
|
+ if (select(0, &rfds, NULL, NULL, &waittime) <= 0){
|
|
|
+ // socket not readable -- no more responses to process
|
|
|
+ zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
|
|
|
+ }
|
|
|
+#endif
|
|
|
+ else{
|
|
|
gettimeofday(&zh->socket_readable,0);
|
|
|
}
|
|
|
}
|
|
@@ -2385,7 +2456,7 @@ int zookeeper_close(zhandle_t *zh)
|
|
|
}
|
|
|
if(zh->state==ZOO_CONNECTED_STATE){
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_CLOSE_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type , ZOO_CLOSE_OP)};
|
|
|
LOG_INFO(("Closing zookeeper sessionId=%#llx to [%s]\n",
|
|
|
zh->client_id.client_id,format_current_endpoint_info(zh)));
|
|
|
oa = create_buffer_oarchive();
|
|
@@ -2412,6 +2483,9 @@ finish:
|
|
|
destroy(zh);
|
|
|
adaptor_destroy(zh);
|
|
|
free(zh);
|
|
|
+#ifdef WIN32
|
|
|
+ Win32WSACleanup();
|
|
|
+#endif
|
|
|
return rc;
|
|
|
}
|
|
|
|
|
@@ -2508,7 +2582,7 @@ int zoo_awget(zhandle_t *zh, const char *path,
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
char *server_path = prepend_string(zh, path);
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_GETDATA_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type ,ZOO_GETDATA_OP)};
|
|
|
struct GetDataRequest req = { (char*)server_path, watcher!=0 };
|
|
|
int rc;
|
|
|
|
|
@@ -2543,8 +2617,9 @@ int zoo_awget(zhandle_t *zh, const char *path,
|
|
|
static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
|
|
|
const char *path, const char *buffer, int buflen, int version)
|
|
|
{
|
|
|
+ int rc;
|
|
|
assert(req);
|
|
|
- int rc = Request_path_init(zh, 0, &req->path, path);
|
|
|
+ rc = Request_path_init(zh, 0, &req->path, path);
|
|
|
if (rc != ZOK) {
|
|
|
return rc;
|
|
|
}
|
|
@@ -2559,7 +2634,7 @@ int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
|
|
|
int version, stat_completion_t dc, const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_SETDATA_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER(xid , get_xid()), STRUCT_INITIALIZER (type , ZOO_SETDATA_OP)};
|
|
|
struct SetDataRequest req;
|
|
|
int rc = SetDataRequest_init(zh, &req, path, buffer, buflen, version);
|
|
|
if (rc != ZOK) {
|
|
@@ -2588,8 +2663,10 @@ static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req,
|
|
|
const char *path, const char *value,
|
|
|
int valuelen, const struct ACL_vector *acl_entries, int flags)
|
|
|
{
|
|
|
+ int rc;
|
|
|
+ assert(req);
|
|
|
+ rc = Request_path_init(zh, flags, &req->path, path);
|
|
|
assert(req);
|
|
|
- int rc = Request_path_init(zh, flags, &req->path, path);
|
|
|
if (rc != ZOK) {
|
|
|
return rc;
|
|
|
}
|
|
@@ -2611,7 +2688,7 @@ int zoo_acreate(zhandle_t *zh, const char *path, const char *value,
|
|
|
string_completion_t completion, const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_CREATE_OP };
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type ,ZOO_CREATE_OP) };
|
|
|
struct CreateRequest req;
|
|
|
|
|
|
int rc = CreateRequest_init(zh, &req,
|
|
@@ -2653,7 +2730,7 @@ int zoo_adelete(zhandle_t *zh, const char *path, int version,
|
|
|
void_completion_t completion, const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_DELETE_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type , ZOO_DELETE_OP)};
|
|
|
struct DeleteRequest req;
|
|
|
int rc = DeleteRequest_init(zh, &req, path, version);
|
|
|
if (rc != ZOK) {
|
|
@@ -2689,7 +2766,7 @@ int zoo_awexists(zhandle_t *zh, const char *path,
|
|
|
stat_completion_t completion, const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_EXISTS_OP };
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid ,get_xid()), STRUCT_INITIALIZER (type , ZOO_EXISTS_OP) };
|
|
|
struct ExistsRequest req;
|
|
|
int rc = Request_path_watch_init(zh, 0, &req.path, path,
|
|
|
&req.watch, watcher != NULL);
|
|
@@ -2723,7 +2800,7 @@ static int zoo_awget_children_(zhandle_t *zh, const char *path,
|
|
|
const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_GETCHILDREN_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type , ZOO_GETCHILDREN_OP)};
|
|
|
struct GetChildrenRequest req ;
|
|
|
int rc = Request_path_watch_init(zh, 0, &req.path, path,
|
|
|
&req.watch, watcher != NULL);
|
|
@@ -2771,7 +2848,7 @@ static int zoo_awget_children2_(zhandle_t *zh, const char *path,
|
|
|
{
|
|
|
/* invariant: (sc == NULL) != (sc == NULL) */
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_GETCHILDREN2_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER( xid, get_xid()), STRUCT_INITIALIZER (type ,ZOO_GETCHILDREN2_OP)};
|
|
|
struct GetChildren2Request req ;
|
|
|
int rc = Request_path_watch_init(zh, 0, &req.path, path,
|
|
|
&req.watch, watcher != NULL);
|
|
@@ -2816,7 +2893,7 @@ int zoo_async(zhandle_t *zh, const char *path,
|
|
|
string_completion_t completion, const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_SYNC_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER (type , ZOO_SYNC_OP)};
|
|
|
struct SyncRequest req;
|
|
|
int rc = Request_path_init(zh, 0, &req.path, path);
|
|
|
if (rc != ZOK) {
|
|
@@ -2846,7 +2923,7 @@ int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,
|
|
|
const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_GETACL_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER (xid , get_xid()), STRUCT_INITIALIZER(type ,ZOO_GETACL_OP)};
|
|
|
struct GetACLRequest req;
|
|
|
int rc = Request_path_init(zh, 0, &req.path, path) ;
|
|
|
if (rc != ZOK) {
|
|
@@ -2875,7 +2952,7 @@ int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
|
|
|
struct ACL_vector *acl, void_completion_t completion, const void *data)
|
|
|
{
|
|
|
struct oarchive *oa;
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_SETACL_OP};
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER(xid ,get_xid()), STRUCT_INITIALIZER (type , ZOO_SETACL_OP)};
|
|
|
struct SetACLRequest req;
|
|
|
int rc = Request_path_init(zh, 0, &req.path, path);
|
|
|
if (rc != ZOK) {
|
|
@@ -2946,8 +3023,9 @@ static void op_result_stat_completion(int err, const struct Stat *stat, const vo
|
|
|
static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
|
|
|
const char *path, int version)
|
|
|
{
|
|
|
+ int rc ;
|
|
|
assert(req);
|
|
|
- int rc = Request_path_init(zh, 0, &req->path, path);
|
|
|
+ rc = Request_path_init(zh, 0, &req->path, path);
|
|
|
if (rc != ZOK) {
|
|
|
return rc;
|
|
|
}
|
|
@@ -2959,7 +3037,8 @@ static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *r
|
|
|
int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
|
zoo_op_result_t *results, void_completion_t completion, const void *data)
|
|
|
{
|
|
|
- struct RequestHeader h = { .xid = get_xid(), .type = ZOO_MULTI_OP };
|
|
|
+ struct RequestHeader h = { STRUCT_INITIALIZER(xid, get_xid()), STRUCT_INITIALIZER(type, ZOO_MULTI_OP) };
|
|
|
+ struct MultiHeader mh = { STRUCT_INITIALIZER(type, -1), STRUCT_INITIALIZER(done, 1), STRUCT_INITIALIZER(err, -1) };
|
|
|
struct oarchive *oa = create_buffer_oarchive();
|
|
|
completion_head_t clist = { 0 };
|
|
|
|
|
@@ -2971,7 +3050,7 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
|
zoo_op_result_t *result = results+index;
|
|
|
completion_list_t *entry = NULL;
|
|
|
|
|
|
- struct MultiHeader mh = { .type=op->type, .done=0, .err=-1 };
|
|
|
+ struct MultiHeader mh = { STRUCT_INITIALIZER(type, op->type), STRUCT_INITIALIZER(done, 0), STRUCT_INITIALIZER(err, -1) };
|
|
|
rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
|
|
|
|
|
|
switch(op->type) {
|
|
@@ -3041,7 +3120,6 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
|
|
|
queue_completion(&clist, entry, 0);
|
|
|
}
|
|
|
|
|
|
- struct MultiHeader mh = { .type=-1, .done=1, .err=-1 };
|
|
|
rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
|
|
|
|
|
|
/* BEGIN: CRTICIAL SECTION */
|
|
@@ -3130,6 +3208,10 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
{
|
|
|
int rc= ZOK;
|
|
|
struct timeval started;
|
|
|
+#ifdef WIN32
|
|
|
+ fd_set pollSet;
|
|
|
+ struct timeval wait;
|
|
|
+#endif
|
|
|
gettimeofday(&started,0);
|
|
|
// we can't use dequeue_buffer() here because if (non-blocking) send_buffer()
|
|
|
// returns EWOULDBLOCK we'd have to put the buffer back on the queue.
|
|
@@ -3139,7 +3221,6 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
while (zh->to_send.head != 0&& zh->state == ZOO_CONNECTED_STATE) {
|
|
|
if(timeout!=0){
|
|
|
int elapsed;
|
|
|
- struct pollfd fds;
|
|
|
struct timeval now;
|
|
|
gettimeofday(&now,0);
|
|
|
elapsed=calculate_interval(&started,&now);
|
|
@@ -3147,10 +3228,20 @@ int flush_send_queue(zhandle_t*zh, int timeout)
|
|
|
rc = ZOPERATIONTIMEOUT;
|
|
|
break;
|
|
|
}
|
|
|
+
|
|
|
+#ifdef WIN32
|
|
|
+ wait = get_timeval(timeout-elapsed);
|
|
|
+ FD_ZERO(&pollSet);
|
|
|
+ FD_SET(zh->fd, &pollSet);
|
|
|
+ // Poll the socket
|
|
|
+ rc = select((int)(zh->fd)+1, NULL, &pollSet, NULL, &wait);
|
|
|
+#else
|
|
|
+ struct pollfd fds;
|
|
|
fds.fd = zh->fd;
|
|
|
fds.events = POLLOUT;
|
|
|
fds.revents = 0;
|
|
|
rc = poll(&fds, 1, timeout-elapsed);
|
|
|
+#endif
|
|
|
if (rc<=0) {
|
|
|
/* timed out or an error or POLLERR */
|
|
|
rc = rc==0 ? ZOPERATIONTIMEOUT : ZSYSTEMERROR;
|
|
@@ -3286,6 +3377,9 @@ static const char* format_endpoint_info(const struct sockaddr_storage* ep)
|
|
|
static char buf[128];
|
|
|
char addrstr[128];
|
|
|
void *inaddr;
|
|
|
+#ifdef WIN32
|
|
|
+ char * addrstring;
|
|
|
+#endif
|
|
|
int port;
|
|
|
if(ep==0)
|
|
|
return "null";
|
|
@@ -3301,8 +3395,13 @@ static const char* format_endpoint_info(const struct sockaddr_storage* ep)
|
|
|
#if defined(AF_INET6)
|
|
|
}
|
|
|
#endif
|
|
|
+#ifdef WIN32
|
|
|
+ addrstring = inet_ntoa (*(struct in_addr*)inaddr);
|
|
|
+ sprintf(buf,"%s:%d",addrstring,ntohs(port));
|
|
|
+#else
|
|
|
inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
|
|
|
sprintf(buf,"%s:%d",addrstr,ntohs(port));
|
|
|
+#endif
|
|
|
return buf;
|
|
|
}
|
|
|
|