|
@@ -3644,6 +3644,49 @@ static int add_multi_completion(zhandle_t *zh, int xid, void_completion_t dc,
|
|
|
return add_completion(zh, xid, COMPLETION_MULTI, dc, data, 0,0, clist);
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * After sending the close request, we are waiting for a given millisecs for
|
|
|
+ * getting the answer and/or for the socket to be closed by the server.
|
|
|
+ *
|
|
|
+ * This function should not be called while we still want to process
|
|
|
+ * any response from the server. It must be called after adaptor_finish called,
|
|
|
+ * in order not to mess with the I/O receiver thread in multi-threaded mode.
|
|
|
+ */
|
|
|
+int wait_for_session_to_be_closed(zhandle_t *zh, int timeout_ms)
|
|
|
+{
|
|
|
+ int ret = 0;
|
|
|
+#ifndef WIN32
|
|
|
+ struct pollfd fd_s[1];
|
|
|
+#else
|
|
|
+ fd_set rfds;
|
|
|
+ struct timeval waittime = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
|
|
|
+#endif
|
|
|
+
|
|
|
+ if (zh == NULL) {
|
|
|
+ return ZBADARGUMENTS;
|
|
|
+ }
|
|
|
+
|
|
|
+#ifndef WIN32
|
|
|
+ fd_s[0].fd = zh->fd->sock;
|
|
|
+ fd_s[0].events = POLLIN;
|
|
|
+ ret = poll(fd_s, 1, timeout_ms);
|
|
|
+#else
|
|
|
+ FD_ZERO(&rfds);
|
|
|
+ FD_SET(zh->fd->sock , &rfds);
|
|
|
+ ret = select(zh->fd->sock + 1, &rfds, NULL, NULL, &waittime);
|
|
|
+#endif
|
|
|
+
|
|
|
+ if (ret == 0){
|
|
|
+ LOG_WARN(LOGCALLBACK(zh), "Timed out (%dms) during waiting for server's reply after sending a close request, sessionId=%#llx\n",
|
|
|
+ timeout_ms, zh->client_id.client_id);
|
|
|
+ } else if (ret < 0) {
|
|
|
+ LOG_WARN(LOGCALLBACK(zh), "System error (%d) happened while waiting for server's reply, sessionId=%#llx\n",
|
|
|
+ ret, zh->client_id.client_id);
|
|
|
+ }
|
|
|
+
|
|
|
+ return ZOK;
|
|
|
+}
|
|
|
+
|
|
|
int zookeeper_close(zhandle_t *zh)
|
|
|
{
|
|
|
int rc=ZOK;
|
|
@@ -3669,32 +3712,33 @@ int zookeeper_close(zhandle_t *zh)
|
|
|
}
|
|
|
/* No need to decrement the counter since we're just going to
|
|
|
* destroy the handle later. */
|
|
|
- if (is_connected(zh)){
|
|
|
+ if (is_connected(zh)) {
|
|
|
struct oarchive *oa;
|
|
|
struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
|
|
|
LOG_INFO(LOGCALLBACK(zh), "Closing zookeeper sessionId=%#llx to %s\n",
|
|
|
- zh->client_id.client_id,zoo_get_current_server(zh));
|
|
|
+ zh->client_id.client_id, zoo_get_current_server(zh));
|
|
|
oa = create_buffer_oarchive();
|
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
|
- rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
|
- get_buffer_len(oa));
|
|
|
+ rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa));
|
|
|
/* We queued the buffer, so don't free it */
|
|
|
close_buffer_oarchive(&oa, 0);
|
|
|
if (rc < 0) {
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh), "Error during closing zookeeper session, sessionId=%#llx to %s (error: %d)\n",
|
|
|
+ zh->client_id.client_id, zoo_get_current_server(zh), rc);
|
|
|
rc = ZMARSHALLINGERROR;
|
|
|
- goto finish;
|
|
|
- }
|
|
|
+ } else {
|
|
|
+ /* make sure the close request is sent; we set timeout to an arbitrary
|
|
|
+ * (but reasonable) number of milliseconds since we want the call to block*/
|
|
|
+ rc = adaptor_send_queue(zh, 3000);
|
|
|
|
|
|
- /* make sure the close request is sent; we set timeout to an arbitrary
|
|
|
- * (but reasonable) number of milliseconds since we want the call to block*/
|
|
|
- rc=adaptor_send_queue(zh, 3000);
|
|
|
- }else{
|
|
|
- LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n",
|
|
|
- zh->client_id.client_id);
|
|
|
+ /* give some time to the server to process the session close request properly */
|
|
|
+ rc = rc < 0 ? rc : wait_for_session_to_be_closed(zh, 1500);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
rc = ZOK;
|
|
|
}
|
|
|
|
|
|
-finish:
|
|
|
+ LOG_INFO(LOGCALLBACK(zh), "Freeing zookeeper resources for sessionId=%#llx\n", zh->client_id.client_id);
|
|
|
destroy(zh);
|
|
|
adaptor_destroy(zh);
|
|
|
free(zh->fd);
|