zookeeper.c 71 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef DLL_EXPORT
  17. # define USE_STATIC_LIB
  18. #endif
  19. #if defined(__CYGWIN__)
  20. #define USE_IPV6
  21. #endif
  22. #include <zookeeper.h>
  23. #include <zookeeper.jute.h>
  24. #include <proto.h>
  25. #include "zk_adaptor.h"
  26. #include "zk_log.h"
  27. #include <stdlib.h>
  28. #include <stdio.h>
  29. #include <string.h>
  30. #include <time.h>
  31. #include <sys/time.h>
  32. #include <sys/socket.h>
  33. #include <poll.h>
  34. #include <netinet/in.h>
  35. #include <netinet/tcp.h>
  36. #include <arpa/inet.h>
  37. #include <netdb.h>
  38. #include <errno.h>
  39. #include <unistd.h>
  40. #include <fcntl.h>
  41. #include <assert.h>
  42. #include <stdarg.h>
  43. #include <limits.h>
  44. #define IF_DEBUG(x) if(logLevel==LOG_LEVEL_DEBUG) {x;}
  45. const int ZOOKEEPER_WRITE = 1 << 0;
  46. const int ZOOKEEPER_READ = 1 << 1;
  47. const int EPHEMERAL = 1 << 0;
  48. const int SEQUENCE = 1 << 1;
  49. const int EXPIRED_SESSION_STATE = -112;
  50. const int AUTH_FAILED_STATE = -113;
  51. const int CONNECTING_STATE = 1;
  52. const int ASSOCIATING_STATE = 2;
  53. const int CONNECTED_STATE = 3;
  54. static __attribute__ ((unused)) const char* state2String(int state){
  55. switch(state){
  56. case 0:
  57. return "CLOSED_STATE";
  58. case 1 /*CONNECTING_STATE*/:
  59. return "CONNECTING_STATE";
  60. case 2 /*ASSOCIATING_STATE*/:
  61. return "ASSOCIATING_STATE";
  62. case 3 /*CONNECTED_STATE*/:
  63. return "CONNECTED_STATE";
  64. case -112 /*EXPIRED_SESSION_STATE*/:
  65. return "EXPIRED_SESSION_STATE";
  66. case -113 /*AUTH_FAILED_STATE*/:
  67. return "AUTH_FAILED_STATE";
  68. }
  69. return "INVALID_STATE";
  70. }
  71. const int CREATED_EVENT = 1;
  72. const int DELETED_EVENT = 2;
  73. const int CHANGED_EVENT = 3;
  74. const int CHILD_EVENT = 4;
  75. const int SESSION_EVENT = -1;
  76. const int NOTWATCHING_EVENT = -2;
  77. static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
  78. switch(ev){
  79. case 0:
  80. return "ERROR_EVENT";
  81. case 1 /*CREATED_EVENT*/:
  82. return "CREATED_EVENT";
  83. case 2 /*DELETED_EVENT*/:
  84. return "DELETED_EVENT";
  85. case 3 /*CHANGED_EVENT*/:
  86. return "CHANGED_EVENT";
  87. case 4 /*CHILD_EVENT*/:
  88. return "CHILD_EVENT";
  89. case -1 /*SESSION_EVENT*/:
  90. return "SESSION_EVENT";
  91. case -2 /*NOTWATCHING_EVENT*/:
  92. return "NOTWATCHING_EVENT";
  93. }
  94. return "INVALID_EVENT";
  95. }
  96. const int PERM_READ = 1 << 0;
  97. const int PERM_WRITE = 1 << 1;
  98. const int PERM_CREATE = 1 << 2;
  99. const int PERM_DELETE = 1 << 3;
  100. const int PERM_ADMIN = 1 << 4;
  101. const int PERM_ALL = 0x1f;
  102. struct Id ANYONE_ID_UNSAFE = {"world", "anyone"};
  103. struct Id AUTH_IDS = {"auth", ""};
  104. static struct ACL _OPEN_ACL_UNSAFE_ACL[] = {{0x1f, {"world", "anyone"}}};
  105. static struct ACL _READ_ACL_UNSAFE_ACL[] = {{0x01, {"world", "anyone"}}};
  106. static struct ACL _CREATOR_ALL_ACL_ACL[] = {{0x1f, {"auth", ""}}};
  107. struct ACL_vector OPEN_ACL_UNSAFE = { 1, _OPEN_ACL_UNSAFE_ACL};
  108. struct ACL_vector READ_ACL_UNSAFE = { 1, _READ_ACL_UNSAFE_ACL};
  109. struct ACL_vector CREATOR_ALL_ACL = { 1, _CREATOR_ALL_ACL_ACL};
  110. #define COMPLETION_VOID 0
  111. #define COMPLETION_STAT 1
  112. #define COMPLETION_DATA 2
  113. #define COMPLETION_STRINGLIST 3
  114. #define COMPLETION_ACLLIST 4
  115. #define COMPLETION_STRING 5
  116. const char*err2string(int err);
  117. static const char* format_endpoint_info(const struct sockaddr* ep);
  118. static const char* format_current_endpoint_info(zhandle_t* zh);
  119. static int add_completion(zhandle_t *zh, int xid, int completion_type,
  120. const void *dc, const void *data, int add_to_front);
  121. static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
  122. const char* format,...);
  123. static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
  124. static int disable_conn_permute=0; // permute enabled by default
  125. static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
  126. typedef struct _completion_list {
  127. int xid;
  128. int completion_type; /* one of the COMPLETION_* values */
  129. union {
  130. void_completion_t void_result;
  131. stat_completion_t stat_result;
  132. data_completion_t data_result;
  133. strings_completion_t strings_result;
  134. acl_completion_t acl_result;
  135. string_completion_t string_result;
  136. } c;
  137. const void *data;
  138. buffer_list_t *buffer;
  139. struct _completion_list *next;
  140. } completion_list_t;
  141. const void *zoo_get_context(zhandle_t *zh)
  142. {
  143. return zh->context;
  144. }
  145. void zoo_set_context(zhandle_t *zh, void *context)
  146. {
  147. if (zh != NULL) {
  148. zh->context = context;
  149. }
  150. }
  151. int zoo_recv_timeout(zhandle_t *zh)
  152. {
  153. return zh->recv_timeout;
  154. }
  155. static void init_auth_info(auth_info *auth)
  156. {
  157. auth->scheme=NULL;
  158. auth->auth.buff=NULL;
  159. auth->auth.len=0;
  160. auth->state=0;
  161. auth->completion=0;
  162. auth->data=0;
  163. }
  164. static void free_auth_info(auth_info *auth)
  165. {
  166. if(auth->scheme!=NULL)
  167. free(auth->scheme);
  168. deallocate_Buffer(&auth->auth);
  169. init_auth_info(auth);
  170. }
  171. int is_unrecoverable(zhandle_t *zh)
  172. {
  173. return (zh->state<0)? ZINVALIDSTATE: ZOK;
  174. }
  175. /**
  176. * Frees and closes everything associated with a handle,
  177. * including the handle itself.
  178. */
  179. static void destroy(zhandle_t *zh)
  180. {
  181. if (zh == NULL) {
  182. return;
  183. }
  184. /* call any outstanding completions with a special error code */
  185. cleanup_bufs(zh,1,ZCLOSING);
  186. if (zh->hostname != 0) {
  187. free(zh->hostname);
  188. zh->hostname = NULL;
  189. }
  190. if (zh->fd != -1) {
  191. close(zh->fd);
  192. zh->fd = -1;
  193. zh->state = 0;
  194. }
  195. if (zh->addrs != 0) {
  196. free(zh->addrs);
  197. zh->addrs = NULL;
  198. }
  199. free_auth_info(&zh->auth);
  200. }
  201. static void setup_random()
  202. {
  203. int seed;
  204. int fd = open("/dev/urandom", O_RDONLY);
  205. if (fd == -1) {
  206. seed = getpid();
  207. } else {
  208. read(fd, &seed, sizeof(seed));
  209. close(fd);
  210. }
  211. srandom(seed);
  212. }
  213. /**
  214. * fill in the addrs array of the zookeeper servers in the zhandle. after filling
  215. * them in, we will permute them for load balancing.
  216. */
  217. int getaddrs(zhandle_t *zh)
  218. {
  219. struct hostent *he;
  220. struct sockaddr *addr;
  221. struct sockaddr_in *addr4;
  222. struct sockaddr_in6 *addr6;
  223. char **ptr;
  224. char *hosts = strdup(zh->hostname);
  225. char *host;
  226. int i;
  227. int rc;
  228. int alen = 0; /* the allocated length of the addrs array */
  229. zh->addrs_count = 0;
  230. if (zh->addrs) {
  231. free(zh->addrs);
  232. zh->addrs = 0;
  233. }
  234. if (!hosts) {
  235. LOG_ERROR(("out of memory"));
  236. errno=ENOMEM;
  237. return ZSYSTEMERROR;
  238. }
  239. zh->addrs = 0;
  240. host=strtok(hosts, ",");
  241. while(host) {
  242. char *port_spec = strchr(host, ':');
  243. char *end_port_spec;
  244. int port;
  245. if (!port_spec) {
  246. LOG_ERROR(("no port in %s", host));
  247. errno=EINVAL;
  248. rc=ZBADARGUMENTS;
  249. goto fail;
  250. }
  251. *port_spec = '\0';
  252. port_spec++;
  253. port = strtol(port_spec, &end_port_spec, 0);
  254. if (!*port_spec || *end_port_spec) {
  255. LOG_ERROR(("invalid port in %s", host));
  256. errno=EINVAL;
  257. rc=ZBADARGUMENTS;
  258. goto fail;
  259. }
  260. he = gethostbyname(host);
  261. if (!he) {
  262. LOG_ERROR(("could not resolve %s", host));
  263. errno=EINVAL;
  264. rc=ZBADARGUMENTS;
  265. goto fail;
  266. }
  267. /* Setup the address array */
  268. for(ptr = he->h_addr_list;*ptr != 0; ptr++) {
  269. if (zh->addrs_count == alen) {
  270. void *tmpaddr;
  271. alen += 16;
  272. tmpaddr = realloc(zh->addrs, sizeof(*zh->addrs)*alen);
  273. if (tmpaddr == 0) {
  274. LOG_ERROR(("out of memory"));
  275. errno=ENOMEM;
  276. rc=ZSYSTEMERROR;
  277. goto fail;
  278. }
  279. zh->addrs=tmpaddr;
  280. }
  281. addr = &zh->addrs[zh->addrs_count];
  282. addr4 = (struct sockaddr_in*)addr;
  283. addr6 = (struct sockaddr_in6*)addr;
  284. addr->sa_family = he->h_addrtype;
  285. if (addr->sa_family == AF_INET) {
  286. addr4->sin_port = htons(port);
  287. memset(&addr4->sin_zero, 0, sizeof(addr4->sin_zero));
  288. memcpy(&addr4->sin_addr, *ptr, he->h_length);
  289. zh->addrs_count++;
  290. #if defined(AF_INET6)
  291. } else if (addr->sa_family == AF_INET6) {
  292. addr6->sin6_port = htons(port);
  293. addr6->sin6_scope_id = 0;
  294. addr6->sin6_flowinfo = 0;
  295. memcpy(&addr6->sin6_addr, *ptr, he->h_length);
  296. zh->addrs_count++;
  297. #endif
  298. } else {
  299. LOG_WARN(("skipping unknown address family %x for %s",
  300. addr->sa_family, zh->hostname));
  301. }
  302. }
  303. host = strtok(0, ",");
  304. }
  305. free(hosts);
  306. if(!disable_conn_permute){
  307. setup_random();
  308. /* Permute */
  309. for(i = 0; i < zh->addrs_count; i++) {
  310. struct sockaddr *s1 = zh->addrs + random()%zh->addrs_count;
  311. struct sockaddr *s2 = zh->addrs + random()%zh->addrs_count;
  312. if (s1 != s2) {
  313. struct sockaddr t = *s1;
  314. *s1 = *s2;
  315. *s2 = t;
  316. }
  317. }
  318. }
  319. return ZOK;
  320. fail:
  321. if (zh->addrs) {
  322. free(zh->addrs);
  323. zh->addrs=0;
  324. }
  325. if (hosts) {
  326. free(hosts);
  327. }
  328. return rc;
  329. }
  330. const clientid_t *zoo_client_id(zhandle_t *zh)
  331. {
  332. return &zh->client_id;
  333. }
  334. static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4){}
  335. watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
  336. {
  337. watcher_fn oldWatcher=zh->watcher;
  338. if (newFn) {
  339. zh->watcher = newFn;
  340. } else {
  341. zh->watcher = null_watcher_fn;
  342. }
  343. return oldWatcher;
  344. }
  345. /**
  346. * Create a zookeeper handle associated with the given host and port.
  347. */
  348. zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
  349. int recv_timeout, const clientid_t *clientid, void *context, int flags)
  350. {
  351. int errnosave;
  352. zhandle_t *zh = calloc(1, sizeof(*zh));
  353. if (!zh) {
  354. return 0;
  355. }
  356. zh->fd = -1;
  357. zh->state = 0;
  358. zh->context = context;
  359. zh->recv_timeout = recv_timeout;
  360. if (watcher) {
  361. zh->watcher = watcher;
  362. } else {
  363. zh->watcher = null_watcher_fn;
  364. }
  365. zh->hostname = strdup(host);
  366. if (zh->hostname == 0) {
  367. goto abort;
  368. }
  369. if(getaddrs(zh)!=0) {
  370. goto abort;
  371. }
  372. zh->connect_index = 0;
  373. if (clientid) {
  374. memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
  375. } else {
  376. memset(&zh->client_id, 0, sizeof(zh->client_id));
  377. }
  378. zh->primer_buffer.buffer = zh->primer_storage_buffer;
  379. zh->primer_buffer.curr_offset = 0;
  380. zh->primer_buffer.len = sizeof(zh->primer_storage_buffer);
  381. zh->primer_buffer.next = 0;
  382. zh->last_zxid = 0;
  383. zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
  384. zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
  385. if (adaptor_init(zh) == -1) {
  386. goto abort;
  387. }
  388. return zh;
  389. abort:
  390. errnosave=errno;
  391. destroy(zh);
  392. free(zh);
  393. errno=errnosave;
  394. return 0;
  395. }
  396. static buffer_list_t *allocate_buffer(char *buff, int len)
  397. {
  398. buffer_list_t *buffer = calloc(1, sizeof(*buffer));
  399. if (buffer == 0)
  400. return 0;
  401. buffer->len = len==0?sizeof(*buffer):len;
  402. buffer->curr_offset = 0;
  403. buffer->buffer = buff;
  404. buffer->next = 0;
  405. return buffer;
  406. }
  407. static void free_buffer(buffer_list_t *b)
  408. {
  409. if (!b) {
  410. return;
  411. }
  412. if (b->buffer) {
  413. free(b->buffer);
  414. }
  415. free(b);
  416. }
  417. static buffer_list_t *dequeue_buffer(buffer_head_t *list)
  418. {
  419. buffer_list_t *b;
  420. lock_buffer_list(list);
  421. b = list->head;
  422. if (b) {
  423. list->head = b->next;
  424. if (!list->head) {
  425. assert(b == list->last);
  426. list->last = 0;
  427. }
  428. }
  429. unlock_buffer_list(list);
  430. return b;
  431. }
  432. static int remove_buffer(buffer_head_t *list)
  433. {
  434. buffer_list_t *b = dequeue_buffer(list);
  435. if (!b) {
  436. return 0;
  437. }
  438. free_buffer(b);
  439. return 1;
  440. }
  441. static void queue_buffer(buffer_head_t *list, buffer_list_t *b, int add_to_front)
  442. {
  443. b->next = 0;
  444. lock_buffer_list(list);
  445. if (list->head) {
  446. assert(list->last);
  447. // The list is not empty
  448. if (add_to_front) {
  449. b->next = list->head;
  450. list->head = b;
  451. } else {
  452. list->last->next = b;
  453. list->last = b;
  454. }
  455. }else{
  456. // The list is empty
  457. assert(!list->head);
  458. list->head = b;
  459. list->last = b;
  460. }
  461. unlock_buffer_list(list);
  462. }
  463. static int queue_buffer_bytes(buffer_head_t *list, char *buff, int len)
  464. {
  465. buffer_list_t *b = allocate_buffer(buff,len);
  466. if (!b)
  467. return ZSYSTEMERROR;
  468. queue_buffer(list, b, 0);
  469. return ZOK;
  470. }
  471. static int queue_front_buffer_bytes(buffer_head_t *list, char *buff, int len)
  472. {
  473. buffer_list_t *b = allocate_buffer(buff,len);
  474. if (!b)
  475. return ZSYSTEMERROR;
  476. queue_buffer(list, b, 1);
  477. return ZOK;
  478. }
  479. static __attribute__ ((unused)) int get_queue_len(buffer_head_t *list)
  480. {
  481. int i;
  482. buffer_list_t *ptr;
  483. lock_buffer_list(list);
  484. ptr = list->head;
  485. for (i=0; ptr!=0; ptr=ptr->next, i++)
  486. ;
  487. unlock_buffer_list(list);
  488. return i;
  489. }
  490. /* returns:
  491. * -1 if send failed,
  492. * 0 if send would block while sending the buffer (or a send was incomplete),
  493. * 1 if success
  494. */
  495. static int send_buffer(int fd, buffer_list_t *buff)
  496. {
  497. int len = buff->len;
  498. int off = buff->curr_offset;
  499. int rc = -1;
  500. if (off < 4) {
  501. /* we need to send the length at the beginning */
  502. int nlen = htonl(len);
  503. char *b = (char*)&nlen;
  504. rc = send(fd, b + off, sizeof(nlen) - off, 0);
  505. if (rc == -1) {
  506. if (errno != EAGAIN) {
  507. return -1;
  508. } else {
  509. return 0;
  510. }
  511. } else {
  512. buff->curr_offset += rc;
  513. }
  514. off = buff->curr_offset;
  515. }
  516. if (off >= 4) {
  517. /* want off to now represent the offset into the buffer */
  518. off -= sizeof(buff->len);
  519. rc = send(fd, buff->buffer + off, len - off, 0);
  520. if (rc == -1) {
  521. if (errno != EAGAIN) {
  522. return -1;
  523. }
  524. } else {
  525. buff->curr_offset += rc;
  526. }
  527. }
  528. return buff->curr_offset == len + sizeof(buff->len);
  529. }
  530. /* returns:
  531. * -1 if recv call failed,
  532. * 0 if recv would block,
  533. * 1 if success
  534. */
  535. static int recv_buffer(int fd, buffer_list_t *buff)
  536. {
  537. int off = buff->curr_offset;
  538. int rc = 0;
  539. //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
  540. /* if buffer is less than 4, we are reading in the length */
  541. if (off < 4) {
  542. char *buffer = (char*)&(buff->len);
  543. rc = recv(fd, buffer+off, sizeof(int)-off, 0);
  544. //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
  545. switch(rc) {
  546. case 0:
  547. errno = EHOSTDOWN;
  548. case -1:
  549. if (errno == EAGAIN) {
  550. return 0;
  551. }
  552. return -1;
  553. default:
  554. buff->curr_offset += rc;
  555. }
  556. off = buff->curr_offset;
  557. if (buff->curr_offset == sizeof(buff->len)) {
  558. buff->len = ntohl(buff->len);
  559. buff->buffer = calloc(1, buff->len);
  560. }
  561. }
  562. if (buff->buffer) {
  563. /* want off to now represent the offset into the buffer */
  564. off -= sizeof(buff->len);
  565. rc = recv(fd, buff->buffer+off, buff->len-off, 0);
  566. switch(rc) {
  567. case 0:
  568. errno = EHOSTDOWN;
  569. case -1:
  570. if (errno == EAGAIN) {
  571. break;
  572. }
  573. return -1;
  574. default:
  575. buff->curr_offset += rc;
  576. }
  577. }
  578. return buff->curr_offset == buff->len + sizeof(buff->len);
  579. }
  580. void free_buffers(buffer_head_t *list)
  581. {
  582. while (remove_buffer(list))
  583. ;
  584. }
  585. void free_completions(zhandle_t *zh,int callCompletion,int rc)
  586. {
  587. completion_head_t tmp_list;
  588. lock_completion_list(&zh->sent_requests);
  589. tmp_list = zh->sent_requests;
  590. zh->sent_requests.head = 0;
  591. zh->sent_requests.last = 0;
  592. unlock_completion_list(&zh->sent_requests);
  593. while (tmp_list.head) {
  594. completion_list_t *cptr = tmp_list.head;
  595. tmp_list.head = cptr->next;
  596. if (cptr->c.data_result == SYNCHRONOUS_MARKER) {
  597. struct sync_completion
  598. *sc = (struct sync_completion*)cptr->data;
  599. sc->rc = rc;
  600. notify_sync_completion(sc);
  601. zh->outstanding_sync--;
  602. } else if (callCompletion) {
  603. switch (cptr->completion_type) {
  604. case COMPLETION_DATA:
  605. LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
  606. cptr->c.data_result(rc, 0, 0, 0, cptr->data);
  607. break;
  608. case COMPLETION_STAT:
  609. LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
  610. cptr->c.stat_result(rc, 0, cptr->data);
  611. break;
  612. case COMPLETION_STRINGLIST:
  613. LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
  614. cptr->c.strings_result(rc, 0, cptr->data);
  615. break;
  616. case COMPLETION_STRING:
  617. LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
  618. cptr->c.string_result(rc, 0, cptr->data);
  619. break;
  620. case COMPLETION_ACLLIST:
  621. LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
  622. cptr->c.acl_result(rc, 0, 0, cptr->data);
  623. break;
  624. case COMPLETION_VOID:
  625. LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc));
  626. // We want to skip the ping
  627. if (cptr->xid != PING_XID)
  628. cptr->c.void_result(rc, cptr->data);
  629. break;
  630. }
  631. }
  632. free(cptr);
  633. }
  634. }
  635. static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc)
  636. {
  637. enter_critical(zh);
  638. free_buffers(&zh->to_send);
  639. free_buffers(&zh->to_process);
  640. free_completions(zh,callCompletion,rc);
  641. leave_critical(zh);
  642. if (zh->input_buffer && zh->input_buffer != &zh->primer_buffer) {
  643. free_buffer(zh->input_buffer);
  644. zh->input_buffer = 0;
  645. }
  646. }
  647. static void handle_error(zhandle_t *zh,int rc)
  648. {
  649. close(zh->fd);
  650. if (is_unrecoverable(zh)) {
  651. LOG_DEBUG(("Calling a watcher for a SESSION_EVENT and the state=%s",
  652. state2String(zh->state)));
  653. PROCESS_SESSION_EVENT(zh, zh->state);
  654. } else if (zh->state == CONNECTED_STATE) {
  655. LOG_DEBUG(("Calling a watcher for a SESSION_EVENT and the state=CONNECTING_STATE"));
  656. PROCESS_SESSION_EVENT(zh, CONNECTING_STATE);
  657. }
  658. cleanup_bufs(zh,1,rc);
  659. zh->fd = -1;
  660. zh->connect_index++;
  661. if (!is_unrecoverable(zh)) {
  662. zh->state = 0;
  663. }
  664. }
  665. static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
  666. const char* format, ...)
  667. {
  668. if(logLevel>=LOG_LEVEL_ERROR){
  669. va_list va;
  670. char buf[1024];
  671. va_start(va,format);
  672. vsnprintf(buf, sizeof(buf)-1,format,va);
  673. log_message(LOG_LEVEL_ERROR,line,__func__,
  674. format_log_message("Socket [%s] zk retcode=%d, errno=%d(%s): %s",
  675. format_current_endpoint_info(zh),rc,errno,strerror(errno),buf));
  676. va_end(va);
  677. }
  678. handle_error(zh,rc);
  679. return rc;
  680. }
  681. static void auth_completion_func(int rc, zhandle_t* zh)
  682. {
  683. if(zh==NULL)
  684. return;
  685. if(rc!=0){
  686. LOG_ERROR(("Authentication scheme %s failed. Connection closed.",
  687. zh->auth.scheme));
  688. zh->state=AUTH_FAILED_STATE;
  689. }else{
  690. zh->auth.state=1; // active
  691. LOG_INFO(("Authentication scheme %s succeeded", zh->auth.scheme));
  692. }
  693. // chain call user's completion function
  694. if(zh->auth.completion!=0){
  695. zh->auth.completion(rc,zh->auth.data);
  696. zh->auth.completion=0;
  697. }
  698. }
  699. static int send_auth_info(zhandle_t *zh)
  700. {
  701. struct oarchive *oa;
  702. struct RequestHeader h = { .xid = AUTH_XID, .type = SETAUTH_OP};
  703. struct AuthPacket req;
  704. int rc;
  705. if(zh->auth.scheme==NULL)
  706. return ZOK; // there is nothing to send
  707. oa = create_buffer_oarchive();
  708. req.type=0; // ignored by the server
  709. req.scheme = zh->auth.scheme;
  710. req.auth = zh->auth.auth;
  711. rc = serialize_RequestHeader(oa, "header", &h);
  712. rc = rc < 0 ? rc : serialize_AuthPacket(oa, "req", &req);
  713. /* add this buffer to the head of the send queue */
  714. rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
  715. get_buffer_len(oa));
  716. /* We queued the buffer, so don't free it */
  717. close_buffer_oarchive(&oa, 0);
  718. LOG_DEBUG(("Sending auth info request to %s",format_current_endpoint_info(zh)));
  719. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  720. }
  721. static int serialize_prime_connect(struct connect_req *req, char* buffer){
  722. //this should be the order of serialization
  723. int offset = 0;
  724. req->protocolVersion = htonl(req->protocolVersion);
  725. memcpy(buffer + offset, &req->protocolVersion, sizeof(req->protocolVersion));
  726. offset = offset + sizeof(req->protocolVersion);
  727. req->lastZxidSeen = htonll(req->lastZxidSeen);
  728. memcpy(buffer + offset, &req->lastZxidSeen, sizeof(req->lastZxidSeen));
  729. offset = offset + sizeof(req->lastZxidSeen);
  730. req->timeOut = htonl(req->timeOut);
  731. memcpy(buffer + offset, &req->timeOut, sizeof(req->timeOut));
  732. offset = offset + sizeof(req->timeOut);
  733. req->sessionId = htonll(req->sessionId);
  734. memcpy(buffer + offset, &req->sessionId, sizeof(req->sessionId));
  735. offset = offset + sizeof(req->sessionId);
  736. req->passwd_len = htonl(req->passwd_len);
  737. memcpy(buffer + offset, &req->passwd_len, sizeof(req->passwd_len));
  738. offset = offset + sizeof(req->passwd_len);
  739. memcpy(buffer + offset, req->passwd, sizeof(req->passwd));
  740. return 0;
  741. }
  742. static int deserialize_prime_response(struct prime_struct *req, char* buffer){
  743. //this should be the order of deserialization
  744. int offset = 0;
  745. memcpy(&req->len, buffer + offset, sizeof(req->len));
  746. offset = offset + sizeof(req->len);
  747. req->len = ntohl(req->len);
  748. memcpy(&req->protocolVersion, buffer + offset, sizeof(req->protocolVersion));
  749. offset = offset + sizeof(req->protocolVersion);
  750. req->protocolVersion = ntohl(req->protocolVersion);
  751. memcpy(&req->timeOut, buffer + offset, sizeof(req->timeOut));
  752. offset = offset + sizeof(req->timeOut);
  753. req->timeOut = ntohl(req->timeOut);
  754. memcpy(&req->sessionId, buffer + offset, sizeof(req->sessionId));
  755. offset = offset + sizeof(req->sessionId);
  756. req->sessionId = htonll(req->sessionId);
  757. memcpy(&req->passwd_len, buffer + offset, sizeof(req->passwd_len));
  758. offset = offset + sizeof(req->passwd_len);
  759. req->passwd_len = ntohl(req->passwd_len);
  760. memcpy(req->passwd, buffer + offset, sizeof(req->passwd));
  761. return 0;
  762. }
  763. static int prime_connection(zhandle_t *zh)
  764. {
  765. int rc;
  766. /*this is the size of buffer to serialize req into*/
  767. char buffer_req[HANDSHAKE_REQ_SIZE];
  768. int len = sizeof(buffer_req);
  769. int hlen = 0;
  770. struct connect_req req;
  771. req.protocolVersion = 0;
  772. req.sessionId = zh->client_id.client_id;
  773. req.passwd_len = sizeof(req.passwd);
  774. memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
  775. req.timeOut = zh->recv_timeout;
  776. req.lastZxidSeen = zh->last_zxid;
  777. hlen = htonl(len);
  778. /* We are running fast and loose here, but this string should fit in the initial buffer! */
  779. rc=send(zh->fd, &hlen, sizeof(len), 0);
  780. serialize_prime_connect(&req, buffer_req);
  781. rc=rc<0 ? rc : send(zh->fd, buffer_req, len, 0);
  782. if (rc<0) {
  783. return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
  784. "failed to send a handshake packet: %s", strerror(errno));
  785. }
  786. zh->state = ASSOCIATING_STATE;
  787. zh->input_buffer = &zh->primer_buffer;
  788. /* This seems a bit weird to to set the offset to 4, but we already have a
  789. * length, so we skip reading the length (and allocating the buffer) by
  790. * saying that we are already at offset 4 */
  791. zh->input_buffer->curr_offset = 4;
  792. return ZOK;
  793. }
  794. static inline int calculate_interval(const struct timeval *start,
  795. const struct timeval *end)
  796. {
  797. int interval;
  798. struct timeval i = *end;
  799. i.tv_sec -= start->tv_sec;
  800. i.tv_usec -= start->tv_usec;
  801. interval = i.tv_sec * 1000 + (i.tv_usec/1000);
  802. return interval;
  803. }
  804. static struct timeval get_timeval(int interval)
  805. {
  806. struct timeval tv;
  807. if (interval < 0) {
  808. interval = 0;
  809. }
  810. tv.tv_sec = interval/1000;
  811. tv.tv_usec = (interval%1000)*1000;
  812. return tv;
  813. }
  814. static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
  815. const void *data);
  816. static int add_string_completion(zhandle_t *zh, int xid,
  817. string_completion_t dc, const void *data);
  818. int send_ping(zhandle_t* zh)
  819. {
  820. int rc;
  821. struct oarchive *oa = create_buffer_oarchive();
  822. struct RequestHeader h = { .xid = PING_XID, .type = PING_OP };
  823. rc = serialize_RequestHeader(oa, "header", &h);
  824. enter_critical(zh);
  825. rc = rc < 0 ? rc : add_void_completion(zh, h.xid, 0, 0);
  826. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  827. get_buffer_len(oa));
  828. leave_critical(zh);
  829. close_buffer_oarchive(&oa, 0);
  830. return rc<0 ? rc : adaptor_send_queue(zh, 0);
  831. }
  832. int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
  833. struct timeval *tv)
  834. {
  835. struct timeval now;
  836. if(zh==0 || fd==0 ||interest==0 || tv==0)
  837. return ZBADARGUMENTS;
  838. if (is_unrecoverable(zh))
  839. return ZINVALIDSTATE;
  840. gettimeofday(&now, 0);
  841. if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
  842. int time_left = calculate_interval(&zh->next_deadline, &now);
  843. if (time_left > 10)
  844. LOG_WARN(("Exceeded deadline by %dms", time_left));
  845. }
  846. api_prolog(zh);
  847. *fd = zh->fd;
  848. *interest = 0;
  849. tv->tv_sec = 0;
  850. tv->tv_usec = 0;
  851. if (*fd == -1) {
  852. if (zh->connect_index == zh->addrs_count) {
  853. /* Wait a bit before trying again so that we don't spin */
  854. zh->connect_index = 0;
  855. }else {
  856. int rc;
  857. int on = 1;
  858. zh->fd = socket(PF_INET, SOCK_STREAM, 0);
  859. setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(int));
  860. fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
  861. rc = connect(zh->fd, &zh->addrs[zh->connect_index],
  862. sizeof(struct sockaddr));
  863. if (rc == -1) {
  864. if (errno == EWOULDBLOCK || errno == EINPROGRESS)
  865. zh->state = CONNECTING_STATE;
  866. else
  867. return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
  868. ZCONNECTIONLOSS,"connect() call failed"));
  869. } else {
  870. if((rc=prime_connection(zh))!=0)
  871. return api_epilog(zh,rc);
  872. LOG_INFO(("Initiated connection to server [%s]",
  873. format_endpoint_info(&zh->addrs[zh->connect_index])));
  874. }
  875. }
  876. *fd = zh->fd;
  877. *tv = get_timeval(zh->recv_timeout/3);
  878. zh->last_recv = now;
  879. zh->last_send = now;
  880. }
  881. if (zh->fd != -1) {
  882. int idle_recv = calculate_interval(&zh->last_recv, &now);
  883. int idle_send = calculate_interval(&zh->last_send, &now);
  884. int recv_to = zh->recv_timeout*2/3 - idle_recv;
  885. int send_to = zh->recv_timeout/3;
  886. // have we exceeded the receive timeout threshold?
  887. if (recv_to <= 0) {
  888. // We gotta cut our losses and connect to someone else
  889. errno = ETIMEDOUT;
  890. *fd=-1;
  891. *interest=0;
  892. *tv = get_timeval(0);
  893. return api_epilog(zh,handle_socket_error_msg(zh,
  894. __LINE__,ZOPERATIONTIMEOUT,
  895. "connection timed out (exceeded timeout by %dms)",-recv_to));
  896. }
  897. // We only allow 1/3 of our timeout time to expire before sending
  898. // a PING
  899. if (zh->state==CONNECTED_STATE) {
  900. send_to = zh->recv_timeout/3 - idle_send;
  901. if (send_to <= 0) {
  902. // LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
  903. // format_current_endpoint_info(zh),-send_to));
  904. int rc=send_ping(zh);
  905. if (rc < 0){
  906. LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
  907. return api_epilog(zh,rc);
  908. }
  909. send_to = zh->recv_timeout/3;
  910. }
  911. }
  912. // choose the lesser value as the timeout
  913. *tv = get_timeval(recv_to < send_to? recv_to:send_to);
  914. zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
  915. zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
  916. if (zh->next_deadline.tv_usec > 1000000) {
  917. zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
  918. zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
  919. }
  920. *interest = ZOOKEEPER_READ;
  921. if (zh->to_send.head || zh->state == CONNECTING_STATE) {
  922. *interest |= ZOOKEEPER_WRITE;
  923. }
  924. }
  925. return api_epilog(zh,ZOK);
  926. }
  927. static int check_events(zhandle_t *zh, int events)
  928. {
  929. if (zh->fd == -1)
  930. return ZINVALIDSTATE;
  931. if ((events&ZOOKEEPER_WRITE)&&(zh->state == CONNECTING_STATE)) {
  932. int rc, error;
  933. socklen_t len = sizeof(error);
  934. rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
  935. if (rc < 0 || error) {
  936. if (rc == 0)
  937. errno = error;
  938. return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
  939. "server refused to accept the client");
  940. }
  941. if((rc=prime_connection(zh))!=0)
  942. return rc;
  943. LOG_INFO(("initiated connection to server [%s]",
  944. format_endpoint_info(&zh->addrs[zh->connect_index])));
  945. return ZOK;
  946. }
  947. if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
  948. /* make the flush call non-blocking by specifying a 0 timeout */
  949. int rc=flush_send_queue(zh,0);
  950. if (rc < 0)
  951. return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
  952. "failed while flushing send queue");
  953. }
  954. if (events&ZOOKEEPER_READ) {
  955. int rc;
  956. if (zh->input_buffer == 0) {
  957. zh->input_buffer = allocate_buffer(0,0);
  958. }
  959. rc = recv_buffer(zh->fd, zh->input_buffer);
  960. if (rc < 0) {
  961. return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
  962. "failed while receiving a server response");
  963. }
  964. if (rc > 0) {
  965. gettimeofday(&zh->last_recv, 0);
  966. if (zh->input_buffer != &zh->primer_buffer) {
  967. queue_buffer(&zh->to_process, zh->input_buffer, 0);
  968. } else {
  969. int64_t oldid,newid;
  970. //deserialize
  971. deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
  972. /* We are processing the primer_buffer, so we need to finish
  973. * the connection handshake */
  974. oldid = zh->client_id.client_id;
  975. newid = zh->primer_storage.sessionId;
  976. if (oldid != 0 && oldid != newid) {
  977. zh->state = EXPIRED_SESSION_STATE;
  978. errno = ESTALE;
  979. return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
  980. "session %llx has expired.",oldid);
  981. } else {
  982. zh->recv_timeout = zh->primer_storage.timeOut;
  983. zh->client_id.client_id = newid;
  984. memcpy(zh->client_id.passwd, &zh->primer_storage.passwd, sizeof(zh->client_id.passwd));
  985. zh->state = CONNECTED_STATE;
  986. LOG_INFO(("connected to server [%s] with session id=%llx",
  987. format_endpoint_info(&zh->addrs[zh->connect_index]),newid));
  988. /* send the authentication packet now */
  989. send_auth_info(zh);
  990. LOG_DEBUG(("Calling a watcher for a SESSION_EVENT and the state=CONNECTED_STATE"));
  991. zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
  992. PROCESS_SESSION_EVENT(zh, CONNECTED_STATE);
  993. }
  994. }
  995. zh->input_buffer = 0;
  996. }else{
  997. // zookeeper_process was called but there was nothing to read
  998. // from the socket
  999. return ZNOTHING;
  1000. }
  1001. }
  1002. return ZOK;
  1003. }
  1004. void api_prolog(zhandle_t* zh)
  1005. {
  1006. inc_ref_counter(zh,1);
  1007. }
  1008. int api_epilog(zhandle_t *zh,int rc)
  1009. {
  1010. if(inc_ref_counter(zh,-1)==0 && zh->close_requested!=0)
  1011. zookeeper_close(zh);
  1012. return rc;
  1013. }
  1014. static __attribute__((unused)) void print_completion_queue(zhandle_t *zh)
  1015. {
  1016. completion_list_t* cptr;
  1017. if(logLevel<LOG_LEVEL_DEBUG) return;
  1018. fprintf(LOGSTREAM,"Completion queue: ");
  1019. if (zh->sent_requests.head==0) {
  1020. fprintf(LOGSTREAM,"empty\n");
  1021. return;
  1022. }
  1023. cptr=zh->sent_requests.head;
  1024. while(cptr){
  1025. fprintf(LOGSTREAM,"%d,",cptr->xid);
  1026. cptr=cptr->next;
  1027. }
  1028. fprintf(LOGSTREAM,"end\n");
  1029. }
  1030. static completion_list_t* create_completion_entry(int xid, int completion_type,
  1031. const void *dc, const void *data);
  1032. static void queue_completion(completion_head_t *list, completion_list_t *c,
  1033. int add_to_front);
  1034. #ifdef THREADED
  1035. // IO thread queues session events to be processed by the completion thread
  1036. int queue_session_event(zhandle_t *zh, int state)
  1037. {
  1038. int rc;
  1039. struct WatcherEvent evt = { SESSION_EVENT, state, "" };
  1040. struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
  1041. struct oarchive *oa;
  1042. completion_list_t *cptr;
  1043. if ((oa=create_buffer_oarchive())==NULL) {
  1044. LOG_ERROR(("out of memory"));
  1045. goto error;
  1046. }
  1047. rc = serialize_ReplyHeader(oa, "hdr", &hdr);
  1048. rc = rc<0?rc: serialize_WatcherEvent(oa, "event", &evt);
  1049. if(rc<0){
  1050. close_buffer_oarchive(&oa, 1);
  1051. goto error;
  1052. }
  1053. if ((cptr=calloc(1,sizeof(*cptr)))==NULL) {
  1054. LOG_ERROR(("out of memory"));
  1055. close_buffer_oarchive(&oa, 1);
  1056. goto error;
  1057. }
  1058. cptr->xid = WATCHER_EVENT_XID;
  1059. cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
  1060. cptr->buffer->curr_offset = get_buffer_len(oa);
  1061. if (!cptr->buffer) {
  1062. free(cptr);
  1063. close_buffer_oarchive(&oa, 1);
  1064. goto error;
  1065. }
  1066. /* We queued the buffer, so don't free it */
  1067. close_buffer_oarchive(&oa, 0);
  1068. queue_completion(&zh->completions_to_process, cptr, 0);
  1069. return ZOK;
  1070. error:
  1071. errno=ENOMEM;
  1072. return ZSYSTEMERROR;
  1073. }
  1074. #endif
  1075. completion_list_t *dequeue_completion(completion_head_t *list)
  1076. {
  1077. completion_list_t *cptr;
  1078. lock_completion_list(list);
  1079. cptr = list->head;
  1080. if (cptr) {
  1081. list->head = cptr->next;
  1082. if (!list->head) {
  1083. assert(list->last == cptr);
  1084. list->last = 0;
  1085. }
  1086. }
  1087. unlock_completion_list(list);
  1088. return cptr;
  1089. }
  1090. void process_completions(zhandle_t *zh)
  1091. {
  1092. completion_list_t *cptr;
  1093. while ((cptr = dequeue_completion(&zh->completions_to_process)) != 0) {
  1094. struct ReplyHeader hdr;
  1095. buffer_list_t *bptr = cptr->buffer;
  1096. struct iarchive *ia = create_buffer_iarchive(bptr->buffer,
  1097. bptr->curr_offset);
  1098. deserialize_ReplyHeader(ia, "hdr", &hdr);
  1099. zh->last_zxid = hdr.zxid;
  1100. if (hdr.xid == WATCHER_EVENT_XID) {
  1101. int type, state;
  1102. struct WatcherEvent evt;
  1103. deserialize_WatcherEvent(ia, "event", &evt);
  1104. /* We are doing a notification, so there is no pending request */
  1105. type = evt.type;
  1106. state = evt.state;
  1107. /* This is a notification so there aren't any pending requests */
  1108. LOG_DEBUG(("Calling a watcher for node [%s], event=%s",
  1109. (evt.path==NULL?"NULL":evt.path),watcherEvent2String(type)));
  1110. zh->watcher(zh, type, state, evt.path);
  1111. deallocate_WatcherEvent(&evt);
  1112. } else {
  1113. int rc = hdr.err;
  1114. switch (cptr->completion_type) {
  1115. case COMPLETION_DATA:
  1116. LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
  1117. if (rc) {
  1118. cptr->c.data_result(rc, 0, 0, 0, cptr->data);
  1119. } else {
  1120. struct GetDataResponse res;
  1121. deserialize_GetDataResponse(ia, "reply", &res);
  1122. cptr->c.data_result(rc, res.data.buff, res.data.len,
  1123. &res.stat, cptr->data);
  1124. deallocate_GetDataResponse(&res);
  1125. }
  1126. break;
  1127. case COMPLETION_STAT:
  1128. LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
  1129. if (rc) {
  1130. cptr->c.stat_result(rc, 0, cptr->data);
  1131. } else {
  1132. struct SetDataResponse res;
  1133. deserialize_SetDataResponse(ia, "reply", &res);
  1134. cptr->c.stat_result(rc, &res.stat, cptr->data);
  1135. deallocate_SetDataResponse(&res);
  1136. }
  1137. break;
  1138. case COMPLETION_STRINGLIST:
  1139. LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
  1140. if (rc) {
  1141. cptr->c.strings_result(rc, 0, cptr->data);
  1142. } else {
  1143. struct GetChildrenResponse res;
  1144. deserialize_GetChildrenResponse(ia, "reply", &res);
  1145. cptr->c.strings_result(rc, &res.children, cptr->data);
  1146. deallocate_GetChildrenResponse(&res);
  1147. }
  1148. break;
  1149. case COMPLETION_STRING:
  1150. LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
  1151. if (rc) {
  1152. cptr->c.string_result(rc, 0, cptr->data);
  1153. } else {
  1154. struct CreateResponse res;
  1155. deserialize_CreateResponse(ia, "reply", &res);
  1156. cptr->c.string_result(rc, res.path, cptr->data);
  1157. deallocate_CreateResponse(&res);
  1158. }
  1159. break;
  1160. case COMPLETION_ACLLIST:
  1161. LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
  1162. if (rc) {
  1163. cptr->c.acl_result(rc, 0, 0, cptr->data);
  1164. } else {
  1165. struct GetACLResponse res;
  1166. deserialize_GetACLResponse(ia, "reply", &res);
  1167. cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
  1168. deallocate_GetACLResponse(&res);
  1169. }
  1170. break;
  1171. case COMPLETION_VOID:
  1172. LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc));
  1173. if (hdr.xid == PING_XID) {
  1174. // We want to skip the ping
  1175. } else {
  1176. cptr->c.void_result(rc, cptr->data);
  1177. }
  1178. break;
  1179. }
  1180. free_buffer(cptr->buffer);
  1181. free(cptr);
  1182. }
  1183. close_buffer_iarchive(&ia);
  1184. }
  1185. }
  1186. static void isSocketReadable(zhandle_t* zh)
  1187. {
  1188. struct pollfd fds;
  1189. fds.fd = zh->fd;
  1190. fds.events = POLLIN;
  1191. if (poll(&fds,1,0)<=0) {
  1192. // socket not readable -- no more responses to process
  1193. zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
  1194. }else{
  1195. gettimeofday(&zh->socket_readable,0);
  1196. }
  1197. }
  1198. static void checkResponseLatency(zhandle_t* zh)
  1199. {
  1200. int delay;
  1201. struct timeval now;
  1202. if(zh->socket_readable.tv_sec==0)
  1203. return;
  1204. gettimeofday(&now,0);
  1205. delay=calculate_interval(&zh->socket_readable, &now);
  1206. if(delay>20)
  1207. LOG_DEBUG(("The following server response has spent at least %dms sitting in the client socket recv buffer",delay));
  1208. zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
  1209. }
  1210. int zookeeper_process(zhandle_t *zh, int events)
  1211. {
  1212. buffer_list_t *bptr;
  1213. int rc;
  1214. if (zh==NULL)
  1215. return ZBADARGUMENTS;
  1216. if (is_unrecoverable(zh))
  1217. return ZINVALIDSTATE;
  1218. api_prolog(zh);
  1219. IF_DEBUG(checkResponseLatency(zh));
  1220. rc = check_events(zh, events);
  1221. if (rc!=ZOK)
  1222. return api_epilog(zh, rc);
  1223. IF_DEBUG(isSocketReadable(zh));
  1224. while (rc >= 0&& (bptr=dequeue_buffer(&zh->to_process))) {
  1225. struct ReplyHeader hdr;
  1226. struct iarchive *ia = create_buffer_iarchive(
  1227. bptr->buffer, bptr->curr_offset);
  1228. deserialize_ReplyHeader(ia, "hdr", &hdr);
  1229. zh->last_zxid = hdr.zxid;
  1230. if (hdr.xid == WATCHER_EVENT_XID) {
  1231. completion_list_t *c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0);
  1232. c->buffer = bptr;
  1233. queue_completion(&zh->completions_to_process, c, 0);
  1234. } else if(hdr.xid == AUTH_XID){
  1235. /* special handling for the AUTH response as it may come back
  1236. * out-of-band */
  1237. auth_completion_func(hdr.err,zh);
  1238. free_buffer(bptr);
  1239. /* authentication completion may change the connection state to
  1240. * unrecoverable */
  1241. if(is_unrecoverable(zh)){
  1242. handle_error(zh, ZAUTHFAILED);
  1243. close_buffer_iarchive(&ia);
  1244. return api_epilog(zh, ZAUTHFAILED);
  1245. }
  1246. } else {
  1247. int rc = hdr.err;
  1248. /* Find the request corresponding to the response */
  1249. completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
  1250. assert(cptr);
  1251. /* The requests are going to come back in order */
  1252. if (cptr->xid != hdr.xid) {
  1253. // received unexpected (or out-of-order) response
  1254. close_buffer_iarchive(&ia);
  1255. free_buffer(bptr);
  1256. // put the completion back on the queue (so it gets properly
  1257. // signaled and deallocated) and disconnect from the server
  1258. queue_completion(&zh->sent_requests,cptr,1);
  1259. return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
  1260. "unexpected server response: expected %x, but received %x",
  1261. hdr.xid,cptr->xid);
  1262. }
  1263. if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
  1264. if(hdr.xid == PING_XID){
  1265. // Nothing to do with a ping response
  1266. free_buffer(bptr);
  1267. free(cptr);
  1268. } else {
  1269. cptr->buffer = bptr;
  1270. queue_completion(&zh->completions_to_process, cptr, 0);
  1271. }
  1272. } else {
  1273. struct sync_completion
  1274. *sc = (struct sync_completion*)cptr->data;
  1275. sc->rc = rc;
  1276. switch(cptr->completion_type) {
  1277. case COMPLETION_DATA:
  1278. if (rc==0) {
  1279. struct GetDataResponse res;
  1280. int len;
  1281. LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",cptr->xid,rc));
  1282. deserialize_GetDataResponse(ia, "reply", &res);
  1283. if (res.data.len <= sc->u.data.buff_len) {
  1284. len = res.data.len;
  1285. } else {
  1286. len = sc->u.data.buff_len;
  1287. }
  1288. sc->u.data.buff_len = len;
  1289. memcpy(sc->u.data.buffer, res.data.buff, len);
  1290. sc->u.data.stat = res.stat;
  1291. deallocate_GetDataResponse(&res);
  1292. }
  1293. break;
  1294. case COMPLETION_STAT:
  1295. if (rc == 0) {
  1296. struct SetDataResponse res;
  1297. LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",cptr->xid,rc));
  1298. deserialize_SetDataResponse(ia, "reply", &res);
  1299. sc->u.stat = res.stat;
  1300. deallocate_SetDataResponse(&res);
  1301. }
  1302. break;
  1303. case COMPLETION_STRINGLIST:
  1304. if (rc == 0) {
  1305. struct GetChildrenResponse res;
  1306. LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",cptr->xid,rc));
  1307. deserialize_GetChildrenResponse(ia, "reply", &res);
  1308. sc->u.strs = res.children;
  1309. /* We don't deallocate since we are passing it back */
  1310. // deallocate_GetChildrenResponse(&res);
  1311. }
  1312. break;
  1313. case COMPLETION_STRING:
  1314. if (rc == 0) {
  1315. struct CreateResponse res;
  1316. int len;
  1317. LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",cptr->xid,rc));
  1318. deserialize_CreateResponse(ia, "reply", &res);
  1319. if (sc->u.str.str_len > strlen(res.path)) {
  1320. len = strlen(res.path);
  1321. } else {
  1322. len = sc->u.str.str_len;
  1323. }
  1324. memcpy(sc->u.str.str, res.path, len);
  1325. sc->u.str.str[len] = '\0';
  1326. deallocate_CreateResponse(&res);
  1327. }
  1328. break;
  1329. case COMPLETION_ACLLIST:
  1330. if (rc == 0) {
  1331. struct GetACLResponse res;
  1332. LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",cptr->xid,rc));
  1333. deserialize_GetACLResponse(ia, "reply", &res);
  1334. cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
  1335. sc->u.acl.acl = res.acl;
  1336. sc->u.acl.stat = res.stat;
  1337. /* We don't deallocate since we are passing it back */
  1338. //deallocate_GetACLResponse(&res);
  1339. }
  1340. break;
  1341. case COMPLETION_VOID:
  1342. LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",cptr->xid,rc));
  1343. break;
  1344. }
  1345. notify_sync_completion(sc);
  1346. free_buffer(bptr);
  1347. zh->outstanding_sync--;
  1348. free(cptr);
  1349. }
  1350. }
  1351. close_buffer_iarchive(&ia);
  1352. }
  1353. if (process_async(zh->outstanding_sync)) {
  1354. process_completions(zh);
  1355. }
  1356. return api_epilog(zh,ZOK);
  1357. }
  1358. int zoo_state(zhandle_t *zh)
  1359. {
  1360. if(zh!=0)
  1361. return zh->state;
  1362. return 0;
  1363. }
  1364. static completion_list_t* create_completion_entry(int xid, int completion_type,
  1365. const void *dc, const void *data)
  1366. {
  1367. completion_list_t *c = calloc(1,sizeof(completion_list_t));
  1368. if (!c) {
  1369. LOG_ERROR(("out of memory"));
  1370. return 0;
  1371. }
  1372. c->completion_type = completion_type;
  1373. c->data = data;
  1374. switch(c->completion_type) {
  1375. case COMPLETION_VOID:
  1376. c->c.void_result = (void_completion_t)dc;
  1377. break;
  1378. case COMPLETION_STRING:
  1379. c->c.string_result = (string_completion_t)dc;
  1380. break;
  1381. case COMPLETION_DATA:
  1382. c->c.data_result = (data_completion_t)dc;
  1383. break;
  1384. case COMPLETION_STAT:
  1385. c->c.stat_result = (stat_completion_t)dc;
  1386. break;
  1387. case COMPLETION_STRINGLIST:
  1388. c->c.strings_result = (strings_completion_t)dc;
  1389. break;
  1390. case COMPLETION_ACLLIST:
  1391. c->c.acl_result = (acl_completion_t)dc;
  1392. break;
  1393. }
  1394. c->xid = xid;
  1395. c->next = 0;
  1396. return c;
  1397. }
  1398. static void queue_completion(completion_head_t *list, completion_list_t *c,
  1399. int add_to_front)
  1400. {
  1401. c->next = 0;
  1402. /* appending a new entry to the back of the list */
  1403. lock_completion_list(list);
  1404. if (list->last) {
  1405. assert(list->head);
  1406. // List is not empty
  1407. if (!add_to_front) {
  1408. list->last->next = c;
  1409. list->last = c;
  1410. } else {
  1411. c->next = list->head;
  1412. list->head = c;
  1413. }
  1414. } else {
  1415. // List is empty
  1416. assert(!list->head);
  1417. list->head = c;
  1418. list->last = c;
  1419. }
  1420. unlock_completion_list(list);
  1421. }
  1422. static int add_completion(zhandle_t *zh, int xid, int completion_type,
  1423. const void *dc, const void *data, int add_to_front)
  1424. {
  1425. completion_list_t *c =create_completion_entry(xid, completion_type, dc,
  1426. data);
  1427. if (!c)
  1428. return ZSYSTEMERROR;
  1429. queue_completion(&zh->sent_requests, c, add_to_front);
  1430. if (dc == SYNCHRONOUS_MARKER) {
  1431. zh->outstanding_sync++;
  1432. }
  1433. return ZOK;
  1434. }
  1435. static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
  1436. const void *data)
  1437. {
  1438. return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0);
  1439. }
  1440. static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
  1441. const void *data)
  1442. {
  1443. return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0);
  1444. }
  1445. static int add_strings_completion(zhandle_t *zh, int xid,
  1446. strings_completion_t dc, const void *data)
  1447. {
  1448. return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0);
  1449. }
  1450. static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
  1451. const void *data)
  1452. {
  1453. return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0);
  1454. }
  1455. static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
  1456. const void *data)
  1457. {
  1458. return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0);
  1459. }
  1460. static int add_string_completion(zhandle_t *zh, int xid,
  1461. string_completion_t dc, const void *data)
  1462. {
  1463. return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0);
  1464. }
  1465. int zookeeper_close(zhandle_t *zh)
  1466. {
  1467. int rc=ZOK;
  1468. if (zh==0)
  1469. return ZBADARGUMENTS;
  1470. zh->close_requested=1;
  1471. if (inc_ref_counter(zh,0)!=0) {
  1472. adaptor_finish(zh);
  1473. return ZOK;
  1474. }
  1475. if(zh->state==CONNECTED_STATE){
  1476. struct oarchive *oa;
  1477. struct RequestHeader h = { .xid = get_xid(), .type = CLOSE_OP};
  1478. LOG_INFO(("Closing zookeeper session %llx to [%s]\n",
  1479. zh->client_id.client_id,format_current_endpoint_info(zh)));
  1480. oa = create_buffer_oarchive();
  1481. rc = serialize_RequestHeader(oa, "header", &h);
  1482. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1483. get_buffer_len(oa));
  1484. /* We queued the buffer, so don't free it */
  1485. close_buffer_oarchive(&oa, 0);
  1486. if (rc < 0) {
  1487. rc = ZMARSHALLINGERROR;
  1488. goto finish;
  1489. }
  1490. /* make sure the close request is sent; we set timeout to an arbitrary
  1491. * (but reasonable) number of milliseconds since we want the call to block*/
  1492. rc=adaptor_send_queue(zh, 3000);
  1493. }else{
  1494. LOG_INFO(("Freeing zookeeper resources for session %llx\n",
  1495. zh->client_id.client_id));
  1496. rc = ZOK;
  1497. }
  1498. finish:
  1499. destroy(zh);
  1500. adaptor_destroy(zh);
  1501. free(zh);
  1502. return rc;
  1503. }
  1504. int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
  1505. const void *data)
  1506. {
  1507. struct oarchive *oa;
  1508. struct RequestHeader h = { .xid = get_xid(), .type = GETDATA_OP};
  1509. struct GetDataRequest req = { (char*)path, watch };
  1510. int rc;
  1511. if (zh==0 || path==0)
  1512. return ZBADARGUMENTS;
  1513. if (is_unrecoverable(zh))
  1514. return ZINVALIDSTATE;
  1515. oa=create_buffer_oarchive();
  1516. rc = serialize_RequestHeader(oa, "header", &h);
  1517. rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
  1518. enter_critical(zh);
  1519. rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data);
  1520. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1521. get_buffer_len(oa));
  1522. leave_critical(zh);
  1523. /* We queued the buffer, so don't free it */
  1524. close_buffer_oarchive(&oa, 0);
  1525. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1526. format_current_endpoint_info(zh)));
  1527. /* make a best (non-blocking) effort to send the requests asap */
  1528. adaptor_send_queue(zh, 0);
  1529. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1530. }
  1531. int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
  1532. int version, stat_completion_t dc, const void *data)
  1533. {
  1534. struct oarchive *oa;
  1535. struct RequestHeader h = { .xid = get_xid(), .type = SETDATA_OP};
  1536. struct SetDataRequest req;
  1537. int rc;
  1538. if (zh==0 || path==0)
  1539. return ZBADARGUMENTS;
  1540. if (is_unrecoverable(zh))
  1541. return ZINVALIDSTATE;
  1542. oa = create_buffer_oarchive();
  1543. req.path = (char*)path;
  1544. req.data.buff = (char*)buffer;
  1545. req.data.len = buflen;
  1546. req.version = version;
  1547. rc = serialize_RequestHeader(oa, "header", &h);
  1548. rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
  1549. enter_critical(zh);
  1550. rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data);
  1551. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1552. get_buffer_len(oa));
  1553. leave_critical(zh);
  1554. /* We queued the buffer, so don't free it */
  1555. close_buffer_oarchive(&oa, 0);
  1556. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1557. format_current_endpoint_info(zh)));
  1558. /* make a best (non-blocking) effort to send the requests asap */
  1559. adaptor_send_queue(zh, 0);
  1560. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1561. }
  1562. int zoo_acreate(zhandle_t *zh, const char *path, const char *value,
  1563. int valuelen, const struct ACL_vector *acl_entries, int ephemeral,
  1564. string_completion_t completion, const void *data)
  1565. {
  1566. struct oarchive *oa;
  1567. struct RequestHeader h = { .xid = get_xid(), .type = CREATE_OP };
  1568. struct CreateRequest req;
  1569. int rc;
  1570. if (zh==0 || path==0)
  1571. return ZBADARGUMENTS;
  1572. if (is_unrecoverable(zh))
  1573. return ZINVALIDSTATE;
  1574. oa = create_buffer_oarchive();
  1575. req.path = (char*)path;
  1576. req.flags = ephemeral;
  1577. req.data.buff = (char*)value;
  1578. req.data.len = valuelen;
  1579. if (acl_entries == 0) {
  1580. req.acl.count = 0;
  1581. req.acl.data = 0;
  1582. } else {
  1583. req.acl = *acl_entries;
  1584. }
  1585. rc = serialize_RequestHeader(oa, "header", &h);
  1586. rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
  1587. enter_critical(zh);
  1588. rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
  1589. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1590. get_buffer_len(oa));
  1591. leave_critical(zh);
  1592. /* We queued the buffer, so don't free it */
  1593. close_buffer_oarchive(&oa, 0);
  1594. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1595. format_current_endpoint_info(zh)));
  1596. /* make a best (non-blocking) effort to send the requests asap */
  1597. adaptor_send_queue(zh, 0);
  1598. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1599. }
  1600. int zoo_adelete(zhandle_t *zh, const char *path, int version,
  1601. void_completion_t completion, const void *data)
  1602. {
  1603. struct oarchive *oa;
  1604. struct RequestHeader h = { .xid = get_xid(), .type = DELETE_OP};
  1605. struct DeleteRequest req;
  1606. int rc;
  1607. if (zh==0 || path==0)
  1608. return ZBADARGUMENTS;
  1609. if (is_unrecoverable(zh))
  1610. return ZINVALIDSTATE;
  1611. oa = create_buffer_oarchive();
  1612. req.path = (char*)path;
  1613. req.version = version;
  1614. rc = serialize_RequestHeader(oa, "header", &h);
  1615. rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
  1616. enter_critical(zh);
  1617. rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
  1618. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1619. get_buffer_len(oa));
  1620. leave_critical(zh);
  1621. /* We queued the buffer, so don't free it */
  1622. close_buffer_oarchive(&oa, 0);
  1623. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1624. format_current_endpoint_info(zh)));
  1625. /* make a best (non-blocking) effort to send the requests asap */
  1626. adaptor_send_queue(zh, 0);
  1627. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1628. }
  1629. int zoo_aexists(zhandle_t *zh, const char *path, int watch,
  1630. stat_completion_t completion, const void *data)
  1631. {
  1632. struct oarchive *oa;
  1633. struct RequestHeader h = { .xid = get_xid(), .type = EXISTS_OP };
  1634. struct ExistsRequest req;
  1635. int rc;
  1636. if (zh==0 || path==0)
  1637. return ZBADARGUMENTS;
  1638. if (is_unrecoverable(zh))
  1639. return ZINVALIDSTATE;
  1640. oa = create_buffer_oarchive();
  1641. req.path = (char*)path;
  1642. req.watch = watch;
  1643. rc = serialize_RequestHeader(oa, "header", &h);
  1644. rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
  1645. enter_critical(zh);
  1646. rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data);
  1647. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1648. get_buffer_len(oa));
  1649. leave_critical(zh);
  1650. /* We queued the buffer, so don't free it */
  1651. close_buffer_oarchive(&oa, 0);
  1652. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1653. format_current_endpoint_info(zh)));
  1654. /* make a best (non-blocking) effort to send the requests asap */
  1655. adaptor_send_queue(zh, 0);
  1656. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1657. }
  1658. int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
  1659. strings_completion_t completion, const void *data)
  1660. {
  1661. struct oarchive *oa;
  1662. struct RequestHeader h = { .xid = get_xid(), .type = GETCHILDREN_OP};
  1663. struct GetChildrenRequest req;
  1664. int rc;
  1665. if (zh==0 || path==0)
  1666. return ZBADARGUMENTS;
  1667. if (is_unrecoverable(zh))
  1668. return ZINVALIDSTATE;
  1669. oa = create_buffer_oarchive();
  1670. req.path = (char*)path;
  1671. req.watch = watch;
  1672. rc = serialize_RequestHeader(oa, "header", &h);
  1673. rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
  1674. enter_critical(zh);
  1675. rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, completion, data);
  1676. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1677. get_buffer_len(oa));
  1678. leave_critical(zh);
  1679. /* We queued the buffer, so don't free it */
  1680. close_buffer_oarchive(&oa, 0);
  1681. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1682. format_current_endpoint_info(zh)));
  1683. /* make a best (non-blocking) effort to send the requests asap */
  1684. adaptor_send_queue(zh, 0);
  1685. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1686. }
  1687. int zoo_async(zhandle_t *zh, const char *path,
  1688. string_completion_t completion, const void *data)
  1689. {
  1690. struct oarchive *oa;
  1691. struct RequestHeader h = { .xid = get_xid(), .type = SYNC_OP};
  1692. struct SyncRequest req;
  1693. int rc;
  1694. if (zh==0 || path==0)
  1695. return ZBADARGUMENTS;
  1696. if (is_unrecoverable(zh))
  1697. return ZINVALIDSTATE;
  1698. oa = create_buffer_oarchive();
  1699. req.path = (char*)path;
  1700. rc = serialize_RequestHeader(oa, "header", &h);
  1701. rc = rc < 0 ? rc : serialize_SyncRequest(oa, "req", &req);
  1702. enter_critical(zh);
  1703. rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
  1704. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1705. get_buffer_len(oa));
  1706. leave_critical(zh);
  1707. /* We queued the buffer, so don't free it */
  1708. close_buffer_oarchive(&oa, 0);
  1709. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1710. format_current_endpoint_info(zh)));
  1711. /* make a best (non-blocking) effort to send the requests asap */
  1712. adaptor_send_queue(zh, 0);
  1713. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1714. }
  1715. int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,
  1716. const void *data)
  1717. {
  1718. struct oarchive *oa;
  1719. struct RequestHeader h = { .xid = get_xid(), .type = GETACL_OP};
  1720. struct GetACLRequest req;
  1721. int rc;
  1722. if (zh==0 || path==0)
  1723. return ZBADARGUMENTS;
  1724. if (is_unrecoverable(zh))
  1725. return ZINVALIDSTATE;
  1726. oa = create_buffer_oarchive();
  1727. req.path = (char*)path;
  1728. rc = serialize_RequestHeader(oa, "header", &h);
  1729. rc = rc < 0 ? rc : serialize_GetACLRequest(oa, "req", &req);
  1730. enter_critical(zh);
  1731. rc = rc < 0 ? rc : add_acl_completion(zh, h.xid, completion, data);
  1732. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1733. get_buffer_len(oa));
  1734. leave_critical(zh);
  1735. /* We queued the buffer, so don't free it */
  1736. close_buffer_oarchive(&oa, 0);
  1737. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1738. format_current_endpoint_info(zh)));
  1739. /* make a best (non-blocking) effort to send the requests asap */
  1740. adaptor_send_queue(zh, 0);
  1741. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1742. }
  1743. int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
  1744. struct ACL_vector *acl, void_completion_t completion, const void *data)
  1745. {
  1746. struct oarchive *oa;
  1747. struct RequestHeader h = { .xid = get_xid(), .type = SETACL_OP};
  1748. struct SetACLRequest req;
  1749. int rc;
  1750. if (zh==0 || path==0)
  1751. return ZBADARGUMENTS;
  1752. if (is_unrecoverable(zh))
  1753. return ZINVALIDSTATE;
  1754. oa = create_buffer_oarchive();
  1755. req.path = (char*)path;
  1756. req.acl = *acl;
  1757. req.version = version;
  1758. rc = serialize_RequestHeader(oa, "header", &h);
  1759. rc = rc < 0 ? rc : serialize_SetACLRequest(oa, "req", &req);
  1760. enter_critical(zh);
  1761. rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
  1762. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1763. get_buffer_len(oa));
  1764. leave_critical(zh);
  1765. /* We queued the buffer, so don't free it */
  1766. close_buffer_oarchive(&oa, 0);
  1767. LOG_DEBUG(("Sending request xid=%x for path [%s] to %s",h.xid,path,
  1768. format_current_endpoint_info(zh)));
  1769. /* make a best (non-blocking) effort to send the requests asap */
  1770. adaptor_send_queue(zh, 0);
  1771. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1772. }
  1773. /* specify timeout of 0 to make the function non-blocking */
  1774. /* timeout is in milliseconds */
  1775. int flush_send_queue(zhandle_t*zh, int timeout)
  1776. {
  1777. int rc= ZOK;
  1778. struct timeval started;
  1779. gettimeofday(&started,0);
  1780. // we can't use dequeue_buffer() here because if (non-blocking) send_buffer()
  1781. // returns EWOULDBLOCK we'd have to put the buffer back on the queue.
  1782. // we use a recursive lock instead and only dequeue the buffer if a send was
  1783. // successful
  1784. lock_buffer_list(&zh->to_send);
  1785. while (zh->to_send.head != 0&& zh->state == CONNECTED_STATE) {
  1786. if(timeout!=0){
  1787. int elapsed;
  1788. struct pollfd fds;
  1789. struct timeval now;
  1790. gettimeofday(&now,0);
  1791. elapsed=calculate_interval(&started,&now);
  1792. if (elapsed>timeout) {
  1793. rc = ZOPERATIONTIMEOUT;
  1794. break;
  1795. }
  1796. fds.fd = zh->fd;
  1797. fds.events = POLLOUT;
  1798. fds.revents = 0;
  1799. rc = poll(&fds, 1, timeout-elapsed);
  1800. if (rc<=0) {
  1801. /* timed out or an error or POLLERR */
  1802. rc = rc==0 ? ZOPERATIONTIMEOUT : ZSYSTEMERROR;
  1803. break;
  1804. }
  1805. }
  1806. rc = send_buffer(zh->fd, zh->to_send.head);
  1807. if(rc==0 && timeout==0){
  1808. /* send_buffer would block while sending this buffer */
  1809. rc = ZOK;
  1810. break;
  1811. }
  1812. if (rc < 0) {
  1813. rc = ZCONNECTIONLOSS;
  1814. break;
  1815. }
  1816. // if the buffer has been sent succesfully, remove it from the queue
  1817. if (rc > 0)
  1818. remove_buffer(&zh->to_send);
  1819. gettimeofday(&zh->last_send, 0);
  1820. rc = ZOK;
  1821. }
  1822. unlock_buffer_list(&zh->to_send);
  1823. return rc;
  1824. }
  1825. const char* zerror(int c)
  1826. {
  1827. switch (c){
  1828. case ZOK:
  1829. return "ok";
  1830. case ZSYSTEMERROR:
  1831. return "system error";
  1832. case ZRUNTIMEINCONSISTENCY:
  1833. return "run time inconsistency";
  1834. case ZDATAINCONSISTENCY:
  1835. return "data inconsistency";
  1836. case ZCONNECTIONLOSS:
  1837. return "connection loss";
  1838. case ZMARSHALLINGERROR:
  1839. return "marshalling error";
  1840. case ZUNIMPLEMENTED:
  1841. return "unimplemented";
  1842. case ZOPERATIONTIMEOUT:
  1843. return "operation timeout";
  1844. case ZBADARGUMENTS:
  1845. return "bad arguments";
  1846. case ZINVALIDSTATE:
  1847. return "invalid zhandle state";
  1848. case ZAPIERROR:
  1849. return "api error";
  1850. case ZNONODE:
  1851. return "no node";
  1852. case ZNOAUTH:
  1853. return "not authenticated";
  1854. case ZBADVERSION:
  1855. return "bad version";
  1856. case ZNOCHILDRENFOREPHEMERALS:
  1857. return "no children for ephemerals";
  1858. case ZNODEEXISTS:
  1859. return "node exists";
  1860. case ZNOTEMPTY:
  1861. return "not empty";
  1862. case ZSESSIONEXPIRED:
  1863. return "session expired";
  1864. case ZINVALIDCALLBACK:
  1865. return "invalid callback";
  1866. case ZINVALIDACL:
  1867. return "invalid acl";
  1868. case ZAUTHFAILED:
  1869. return "authentication failed";
  1870. case ZCLOSING:
  1871. return "zookeeper is closing";
  1872. case ZNOTHING:
  1873. return "(not error) no server responses to process";
  1874. }
  1875. if (c > 0) {
  1876. return strerror(c);
  1877. }
  1878. return "unknown error";
  1879. }
  1880. int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert,
  1881. int certLen,void_completion_t completion, const void *data)
  1882. {
  1883. if(scheme==NULL || zh==NULL)
  1884. return ZBADARGUMENTS;
  1885. if (is_unrecoverable(zh))
  1886. return ZINVALIDSTATE;
  1887. free_auth_info(&zh->auth);
  1888. zh->auth.scheme=strdup(scheme);
  1889. if(cert!=NULL && certLen!=0){
  1890. zh->auth.auth.buff=calloc(1,certLen);
  1891. if(zh->auth.auth.buff==0)
  1892. return ZSYSTEMERROR;
  1893. memcpy(zh->auth.auth.buff,cert,certLen);
  1894. zh->auth.auth.len=certLen;
  1895. }
  1896. zh->auth.completion=completion;
  1897. zh->auth.data=data;
  1898. if(zh->state == CONNECTED_STATE || zh->state == ASSOCIATING_STATE)
  1899. return send_auth_info(zh);
  1900. return ZOK;
  1901. }
  1902. static const char* format_endpoint_info(const struct sockaddr* ep)
  1903. {
  1904. static char buf[128];
  1905. char addrstr[128];
  1906. void *inaddr;
  1907. int port;
  1908. if(ep==0)
  1909. return "null";
  1910. inaddr=&((struct sockaddr_in*)ep)->sin_addr;
  1911. port=((struct sockaddr_in*)ep)->sin_port;
  1912. #if defined(AF_INET6)
  1913. if(ep->sa_family==AF_INET6){
  1914. inaddr=&((struct sockaddr_in6*)ep)->sin6_addr;
  1915. port=((struct sockaddr_in6*)ep)->sin6_port;
  1916. }
  1917. #endif
  1918. inet_ntop(ep->sa_family,inaddr,addrstr,sizeof(addrstr)-1);
  1919. sprintf(buf,"%s:%d",addrstr,ntohs(port));
  1920. return buf;
  1921. }
  1922. static const char* format_current_endpoint_info(zhandle_t* zh)
  1923. {
  1924. return format_endpoint_info(&zh->addrs[zh->connect_index]);
  1925. }
  1926. void zoo_deterministic_conn_order(int yesOrNo)
  1927. {
  1928. disable_conn_permute=yesOrNo;
  1929. }
  1930. /* ****************************************************************************
  1931. * sync API
  1932. */
  1933. int zoo_create(zhandle_t *zh, const char *path, const char *value,
  1934. int valuelen, const struct ACL_vector *acl, int flags, char *realpath,
  1935. int max_realpath_len)
  1936. {
  1937. struct sync_completion *sc = alloc_sync_completion();
  1938. int rc;
  1939. if (!sc) {
  1940. return ZSYSTEMERROR;
  1941. }
  1942. sc->u.str.str = realpath;
  1943. sc->u.str.str_len = max_realpath_len;
  1944. rc=zoo_acreate(zh, path, value, valuelen, acl, flags, SYNCHRONOUS_MARKER, sc);
  1945. if(rc==ZOK){
  1946. wait_sync_completion(sc);
  1947. rc = sc->rc;
  1948. }
  1949. free_sync_completion(sc);
  1950. return rc;
  1951. }
  1952. int zoo_delete(zhandle_t *zh, const char *path, int version)
  1953. {
  1954. struct sync_completion *sc = alloc_sync_completion();
  1955. int rc;
  1956. if (!sc) {
  1957. return ZSYSTEMERROR;
  1958. }
  1959. rc=zoo_adelete(zh, path, version, SYNCHRONOUS_MARKER, sc);
  1960. if(rc==ZOK){
  1961. wait_sync_completion(sc);
  1962. rc = sc->rc;
  1963. }
  1964. free_sync_completion(sc);
  1965. return rc;
  1966. }
  1967. int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
  1968. {
  1969. struct sync_completion *sc = alloc_sync_completion();
  1970. int rc;
  1971. if (!sc) {
  1972. return ZSYSTEMERROR;
  1973. }
  1974. rc=zoo_aexists(zh, path, watch, SYNCHRONOUS_MARKER, sc);
  1975. if(rc==ZOK){
  1976. wait_sync_completion(sc);
  1977. rc = sc->rc;
  1978. if (rc == 0&& stat) {
  1979. *stat = sc->u.stat;
  1980. }
  1981. }
  1982. free_sync_completion(sc);
  1983. return rc;
  1984. }
  1985. int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
  1986. int* buffer_len, struct Stat *stat)
  1987. {
  1988. struct sync_completion *sc;
  1989. int rc=0;
  1990. if(buffer_len==NULL)
  1991. return ZBADARGUMENTS;
  1992. if((sc=alloc_sync_completion())==NULL)
  1993. return ZSYSTEMERROR;
  1994. sc->u.data.buffer = buffer;
  1995. sc->u.data.buff_len = *buffer_len;
  1996. rc=zoo_aget(zh, path, watch, SYNCHRONOUS_MARKER, sc);
  1997. if(rc==ZOK){
  1998. wait_sync_completion(sc);
  1999. rc = sc->rc;
  2000. if (rc == 0) {
  2001. if(stat)
  2002. *stat = sc->u.data.stat;
  2003. *buffer_len = sc->u.data.buff_len;
  2004. }
  2005. }
  2006. free_sync_completion(sc);
  2007. return rc;
  2008. }
  2009. int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
  2010. int version)
  2011. {
  2012. struct sync_completion *sc = alloc_sync_completion();
  2013. int rc;
  2014. if (!sc) {
  2015. return ZSYSTEMERROR;
  2016. }
  2017. rc=zoo_aset(zh, path, buffer, buflen, version, SYNCHRONOUS_MARKER, sc);
  2018. if(rc==ZOK){
  2019. wait_sync_completion(sc);
  2020. rc = sc->rc;
  2021. }
  2022. free_sync_completion(sc);
  2023. return rc;
  2024. }
  2025. int zoo_get_children(zhandle_t *zh, const char *path, int watch,
  2026. struct String_vector *strings)
  2027. {
  2028. struct sync_completion *sc = alloc_sync_completion();
  2029. int rc;
  2030. if (!sc) {
  2031. return ZSYSTEMERROR;
  2032. }
  2033. rc=zoo_aget_children(zh, path, watch, SYNCHRONOUS_MARKER, sc);
  2034. if(rc==ZOK){
  2035. wait_sync_completion(sc);
  2036. rc = sc->rc;
  2037. if (rc == 0) {
  2038. if (strings) {
  2039. *strings = sc->u.strs;
  2040. } else {
  2041. deallocate_String_vector(&sc->u.strs);
  2042. }
  2043. }
  2044. }
  2045. free_sync_completion(sc);
  2046. return rc;
  2047. }
  2048. int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
  2049. struct Stat *stat)
  2050. {
  2051. struct sync_completion *sc = alloc_sync_completion();
  2052. int rc;
  2053. if (!sc) {
  2054. return ZSYSTEMERROR;
  2055. }
  2056. rc=zoo_aget_acl(zh, path, SYNCHRONOUS_MARKER, sc);
  2057. if(rc==ZOK){
  2058. wait_sync_completion(sc);
  2059. rc = sc->rc;
  2060. if (rc == 0&& stat) {
  2061. *stat = sc->u.acl.stat;
  2062. }
  2063. if (rc == 0) {
  2064. if (acl) {
  2065. *acl = sc->u.acl.acl;
  2066. } else {
  2067. deallocate_ACL_vector(&sc->u.acl.acl);
  2068. }
  2069. }
  2070. }
  2071. free_sync_completion(sc);
  2072. return rc;
  2073. }
  2074. int zoo_set_acl(zhandle_t *zh, const char *path, int version,
  2075. const struct ACL_vector *acl)
  2076. {
  2077. struct sync_completion *sc = alloc_sync_completion();
  2078. int rc;
  2079. if (!sc) {
  2080. return ZSYSTEMERROR;
  2081. }
  2082. rc=zoo_aset_acl(zh, path, version, (struct ACL_vector*)acl,
  2083. SYNCHRONOUS_MARKER, sc);
  2084. if(rc==ZOK){
  2085. wait_sync_completion(sc);
  2086. rc = sc->rc;
  2087. }
  2088. free_sync_completion(sc);
  2089. return rc;
  2090. }