Browse Source

ZOOKEEPER-4210: Preserve return code from nonblocking send

Async API calls attempt to flush the send buffer, which
calls flush_send_queue(); and can report
  ZOPERATIONTIMEOUT
  ZSYSTEMERROR
  ZCONNECTIONLOSS

Specifically: send_buffer() calls send(2) with MSG_NOSIGNAL,
which can return EPIPE; then send_buffer return -1, causing
ZCONNECTIONLOSS from flush_send_queue().

Current async API calls drop the return value from flush_send_queue(),
as below:

    adaptor_send_queue(zh, 0);
    return (rc < 0)?ZMARSHALLINGERROR:ZOK;

The async API then returns ZOK instead of ZCONNECTIONLOSS.

Author: Sam Mikes <smikes@apple.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Damien Diederen <ddiederen@apache.org>

Closes #1602 from smikes/asyncsend-returncode-3.6
Sam Mikes 4 years ago
parent
commit
1944f77aab
1 changed files with 40 additions and 44 deletions
  1. 40 44
      zookeeper-client/zookeeper-client-c/src/zookeeper.c

+ 40 - 44
zookeeper-client/zookeeper-client-c/src/zookeeper.c

@@ -3963,6 +3963,19 @@ static int Request_path_watch_init(zhandle_t *zh, int mode,
 /*---------------------------------------------------------------------------*
  * ASYNC API
  *---------------------------------------------------------------------------*/
+
+/* make an attempt to send queued requests immediately without blocking */
+static int nonblocking_send(zhandle_t *zh, int rc)
+{
+    if (adaptor_send_queue(zh, 0) < 0) {
+        if (zh->fd->sock != -1) {
+            close_zsock(zh->fd);
+            zh->state = ZOO_NOTCONNECTED_STATE;
+        }
+    }
+    return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
+}
+
 int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
         const void *data)
 {
@@ -4002,9 +4015,8 @@ int zoo_awget(zhandle_t *zh, const char *path,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int zoo_agetconfig(zhandle_t *zh, int watch, data_completion_t dc,
@@ -4046,9 +4058,8 @@ int zoo_awgetconfig(zhandle_t *zh, watcher_fn watcher, void* watcherCtx,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
                zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
@@ -4082,10 +4093,8 @@ int zoo_areconfig(zhandle_t *zh, const char *joining, const char *leaving,
     close_buffer_oarchive(&oa, 0);
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending Reconfig request xid=%#x to %s",h.xid, zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
 
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+    return nonblocking_send(zh, rc);
 }
 
 static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
@@ -4128,9 +4137,8 @@ int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req,
@@ -4255,9 +4263,8 @@ int zoo_acreate_ttl(zhandle_t *zh, const char *path, const char *value,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int zoo_acreate2(zhandle_t *zh, const char *path, const char *value,
@@ -4322,9 +4329,8 @@ int zoo_acreate2_ttl(zhandle_t *zh, const char *path, const char *value,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
@@ -4362,9 +4368,8 @@ int zoo_adelete(zhandle_t *zh, const char *path, int version,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int zoo_aexists(zhandle_t *zh, const char *path, int watch,
@@ -4401,9 +4406,8 @@ int zoo_awexists(zhandle_t *zh, const char *path,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 static int zoo_awget_children_(zhandle_t *zh, const char *path,
@@ -4434,9 +4438,8 @@ static int zoo_awget_children_(zhandle_t *zh, const char *path,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
@@ -4482,9 +4485,8 @@ static int zoo_awget_children2_(zhandle_t *zh, const char *path,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int zoo_aget_children2(zhandle_t *zh, const char *path, int watch,
@@ -4525,9 +4527,8 @@ int zoo_async(zhandle_t *zh, const char *path,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 
@@ -4555,9 +4556,8 @@ int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
@@ -4586,9 +4586,8 @@ int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",h.xid,path,
             zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
-    return (rc < 0)?ZMARSHALLINGERROR:ZOK;
+
+    return nonblocking_send(zh, rc);
 }
 
 /* Completions for multi-op results */
@@ -4747,10 +4746,8 @@ int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
 
     LOG_DEBUG(LOGCALLBACK(zh), "Sending multi request xid=%#x with %d subrequests to %s",
             h.xid, index, zoo_get_current_server(zh));
-    /* make a best (non-blocking) effort to send the requests asap */
-    adaptor_send_queue(zh, 0);
 
-    return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
+    return nonblocking_send(zh, rc);
 }
 
 typedef union WatchesRequest WatchesRequest;
@@ -4833,7 +4830,6 @@ static int aremove_watches(
         zh, h.xid, COMPLETION_VOID, completion, data, 0, wdo, 0);
     rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
             get_buffer_len(oa));
-    rc = rc < 0 ? ZMARSHALLINGERROR : ZOK;
     leave_critical(zh);
 
     /* We queued the buffer, so don't free it */
@@ -4842,7 +4838,7 @@ static int aremove_watches(
     LOG_DEBUG(LOGCALLBACK(zh), "Sending request xid=%#x for path [%s] to %s",
               h.xid, path, zoo_get_current_server(zh));
 
-    adaptor_send_queue(zh, 0);
+    rc = nonblocking_send(zh, rc);
 
 done:
     free_duplicate_path(server_path, path);