|
@@ -1617,25 +1617,20 @@ void free_completions(zhandle_t *zh,int callCompletion,int reason)
|
|
|
zh->outstanding_sync--;
|
|
|
destroy_completion_entry(cptr);
|
|
|
} else if (callCompletion) {
|
|
|
- if(cptr->xid == PING_XID){
|
|
|
- // Nothing to do with a ping response
|
|
|
- destroy_completion_entry(cptr);
|
|
|
- } else {
|
|
|
- // Fake the response
|
|
|
- buffer_list_t *bptr;
|
|
|
- h.xid = cptr->xid;
|
|
|
- h.zxid = -1;
|
|
|
- h.err = reason;
|
|
|
- oa = create_buffer_oarchive();
|
|
|
- serialize_ReplyHeader(oa, "header", &h);
|
|
|
- bptr = calloc(sizeof(*bptr), 1);
|
|
|
- assert(bptr);
|
|
|
- bptr->len = get_buffer_len(oa);
|
|
|
- bptr->buffer = get_buffer(oa);
|
|
|
- close_buffer_oarchive(&oa, 0);
|
|
|
- cptr->buffer = bptr;
|
|
|
- queue_completion(&zh->completions_to_process, cptr, 0);
|
|
|
- }
|
|
|
+ // Fake the response
|
|
|
+ buffer_list_t *bptr;
|
|
|
+ h.xid = cptr->xid;
|
|
|
+ h.zxid = -1;
|
|
|
+ h.err = reason;
|
|
|
+ oa = create_buffer_oarchive();
|
|
|
+ serialize_ReplyHeader(oa, "header", &h);
|
|
|
+ bptr = calloc(sizeof(*bptr), 1);
|
|
|
+ assert(bptr);
|
|
|
+ bptr->len = get_buffer_len(oa);
|
|
|
+ bptr->buffer = get_buffer(oa);
|
|
|
+ close_buffer_oarchive(&oa, 0);
|
|
|
+ cptr->buffer = bptr;
|
|
|
+ queue_completion(&zh->completions_to_process, cptr, 0);
|
|
|
}
|
|
|
}
|
|
|
a_list.completion = NULL;
|
|
@@ -2007,7 +2002,6 @@ static struct timeval get_timeval(int interval)
|
|
|
rc = serialize_RequestHeader(oa, "header", &h);
|
|
|
enter_critical(zh);
|
|
|
get_system_time(&zh->last_ping);
|
|
|
- rc = rc < 0 ? rc : add_void_completion(zh, h.xid, 0, 0);
|
|
|
rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
|
|
|
get_buffer_len(oa));
|
|
|
leave_critical(zh);
|
|
@@ -2745,12 +2739,8 @@ static void deserialize_response(zhandle_t *zh, int type, int xid, int failed, i
|
|
|
case COMPLETION_VOID:
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
|
|
|
cptr->xid, failed, rc);
|
|
|
- if (xid == PING_XID) {
|
|
|
- // We want to skip the ping
|
|
|
- } else {
|
|
|
- assert(cptr->c.void_result);
|
|
|
- cptr->c.void_result(rc, cptr->data);
|
|
|
- }
|
|
|
+ assert(cptr->c.void_result);
|
|
|
+ cptr->c.void_result(rc, cptr->data);
|
|
|
break;
|
|
|
case COMPLETION_MULTI:
|
|
|
LOG_DEBUG(LOGCALLBACK(zh), "Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
|
|
@@ -2861,7 +2851,15 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
|
bptr->buffer, bptr->curr_offset);
|
|
|
deserialize_ReplyHeader(ia, "hdr", &hdr);
|
|
|
|
|
|
- if (hdr.xid == WATCHER_EVENT_XID) {
|
|
|
+ if (hdr.xid == PING_XID) {
|
|
|
+ // Ping replies can arrive out-of-order
|
|
|
+ int elapsed = 0;
|
|
|
+ struct timeval now;
|
|
|
+ gettimeofday(&now, 0);
|
|
|
+ elapsed = calculate_interval(&zh->last_ping, &now);
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
|
|
|
+ free_buffer(bptr);
|
|
|
+ } else if (hdr.xid == WATCHER_EVENT_XID) {
|
|
|
struct WatcherEvent evt;
|
|
|
int type = 0;
|
|
|
char *path = NULL;
|
|
@@ -2925,7 +2923,7 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
|
hdr.xid,cptr->xid);
|
|
|
}
|
|
|
|
|
|
- if (hdr.xid != PING_XID && hdr.zxid > 0) {
|
|
|
+ if (hdr.zxid > 0) {
|
|
|
// Update last_zxid only when it is a request response
|
|
|
zh->last_zxid = hdr.zxid;
|
|
|
}
|
|
@@ -2933,22 +2931,9 @@ int zookeeper_process(zhandle_t *zh, int events)
|
|
|
deactivateWatcher(zh, cptr->watcher_deregistration, rc);
|
|
|
|
|
|
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
|
|
|
- if(hdr.xid == PING_XID){
|
|
|
- int elapsed = 0;
|
|
|
- struct timeval now;
|
|
|
- get_system_time(&now);
|
|
|
- elapsed = calculate_interval(&zh->last_ping, &now);
|
|
|
- LOG_DEBUG(LOGCALLBACK(zh), "Got ping response in %d ms", elapsed);
|
|
|
-
|
|
|
- // Nothing to do with a ping response
|
|
|
- free_buffer(bptr);
|
|
|
- destroy_completion_entry(cptr);
|
|
|
- } else {
|
|
|
- LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
|
|
|
-
|
|
|
- cptr->buffer = bptr;
|
|
|
- queue_completion(&zh->completions_to_process, cptr, 0);
|
|
|
- }
|
|
|
+ LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
|
|
|
+ cptr->buffer = bptr;
|
|
|
+ queue_completion(&zh->completions_to_process, cptr, 0);
|
|
|
} else {
|
|
|
struct sync_completion
|
|
|
*sc = (struct sync_completion*)cptr->data;
|