|
@@ -369,6 +369,7 @@ void *do_io(void *v)
|
|
fds[0].fd=adaptor_threads->self_pipe[0];
|
|
fds[0].fd=adaptor_threads->self_pipe[0];
|
|
fds[0].events=POLLIN;
|
|
fds[0].events=POLLIN;
|
|
while(!zh->close_requested) {
|
|
while(!zh->close_requested) {
|
|
|
|
+ zh->io_count++;
|
|
struct timeval tv;
|
|
struct timeval tv;
|
|
int fd;
|
|
int fd;
|
|
int interest;
|
|
int interest;
|
|
@@ -396,45 +397,56 @@ void *do_io(void *v)
|
|
while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
|
|
while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
|
|
}
|
|
}
|
|
#else
|
|
#else
|
|
- fd_set rfds, wfds, efds;
|
|
|
|
|
|
+ fd_set rfds, wfds;
|
|
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
|
|
struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
|
|
api_prolog(zh);
|
|
api_prolog(zh);
|
|
notify_thread_ready(zh);
|
|
notify_thread_ready(zh);
|
|
LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
|
|
LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
|
|
- FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds);
|
|
|
|
|
|
+
|
|
while(!zh->close_requested) {
|
|
while(!zh->close_requested) {
|
|
struct timeval tv;
|
|
struct timeval tv;
|
|
SOCKET fd;
|
|
SOCKET fd;
|
|
- SOCKET maxfd=adaptor_threads->self_pipe[0];
|
|
|
|
- int interest;
|
|
|
|
|
|
+ int interest;
|
|
int rc;
|
|
int rc;
|
|
-
|
|
|
|
- zookeeper_interest(zh, &fd, &interest, &tv);
|
|
|
|
- if (fd != -1) {
|
|
|
|
- if (interest&ZOOKEEPER_READ) {
|
|
|
|
|
|
+
|
|
|
|
+ zookeeper_interest(zh, &fd, &interest, &tv);
|
|
|
|
+
|
|
|
|
+ // FD_ZERO is cheap on Win32, it just sets count of elements to zero.
|
|
|
|
+ // It needs to be done to ensure no stale entries.
|
|
|
|
+ FD_ZERO(&rfds);
|
|
|
|
+ FD_ZERO(&wfds);
|
|
|
|
+
|
|
|
|
+ if (fd != -1) {
|
|
|
|
+ if (interest&ZOOKEEPER_READ) {
|
|
FD_SET(fd, &rfds);
|
|
FD_SET(fd, &rfds);
|
|
- } else {
|
|
|
|
- FD_CLR(fd, &rfds);
|
|
|
|
}
|
|
}
|
|
- if (interest&ZOOKEEPER_WRITE) {
|
|
|
|
|
|
+
|
|
|
|
+ if (interest&ZOOKEEPER_WRITE) {
|
|
FD_SET(fd, &wfds);
|
|
FD_SET(fd, &wfds);
|
|
- } else {
|
|
|
|
- FD_CLR(fd, &wfds);
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- FD_SET( adaptor_threads->self_pipe[0] ,&rfds );
|
|
|
|
- rc = select((int)maxfd, &rfds, &wfds, &efds, &tv);
|
|
|
|
- if (fd != -1)
|
|
|
|
- {
|
|
|
|
- interest = (FD_ISSET(fd, &rfds))? ZOOKEEPER_READ:0;
|
|
|
|
- interest|= (FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE:0;
|
|
|
|
|
|
+
|
|
|
|
+ // Always interested in self_pipe.
|
|
|
|
+ FD_SET(adaptor_threads->self_pipe[0], &rfds);
|
|
|
|
+
|
|
|
|
+ rc = select(/* unused */0, &rfds, &wfds, NULL, &tv);
|
|
|
|
+ if (rc > 0) {
|
|
|
|
+ interest=(FD_ISSET(fd, &rfds))? ZOOKEEPER_READ: 0;
|
|
|
|
+ interest|=(FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE: 0;
|
|
|
|
+
|
|
|
|
+ if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){
|
|
|
|
+ // flush the pipe/socket
|
|
|
|
+ char b[128];
|
|
|
|
+ while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){}
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
- if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){
|
|
|
|
- // flush the pipe/socket
|
|
|
|
- char b[128];
|
|
|
|
- while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){}
|
|
|
|
- }
|
|
|
|
|
|
+ else if (rc < 0) {
|
|
|
|
+ LOG_ERROR(LOGCALLBACK(zh), ("select() failed %d [%d].", rc, WSAGetLastError()));
|
|
|
|
+
|
|
|
|
+ // Clear interest events for zookeeper_process if select() fails.
|
|
|
|
+ interest = 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
#endif
|
|
#endif
|
|
// dispatch zookeeper events
|
|
// dispatch zookeeper events
|
|
rc = zookeeper_process(zh, interest);
|
|
rc = zookeeper_process(zh, interest);
|