zookeeper.c 128 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef DLL_EXPORT
  19. # define USE_STATIC_LIB
  20. #endif
  21. #if defined(__CYGWIN__)
  22. #define USE_IPV6
  23. #endif
  24. #include <zookeeper.h>
  25. #include <zookeeper.jute.h>
  26. #include <proto.h>
  27. #include "zk_adaptor.h"
  28. #include "zookeeper_log.h"
  29. #include "zk_hashtable.h"
  30. #include <stdlib.h>
  31. #include <stdio.h>
  32. #include <string.h>
  33. #include <time.h>
  34. #include <errno.h>
  35. #include <fcntl.h>
  36. #include <assert.h>
  37. #include <stdarg.h>
  38. #include <limits.h>
  39. #ifndef WIN32
  40. #include <sys/time.h>
  41. #include <sys/socket.h>
  42. #include <poll.h>
  43. #include <netinet/in.h>
  44. #include <netinet/tcp.h>
  45. #include <arpa/inet.h>
  46. #include <netdb.h>
  47. #include <unistd.h>
  48. #include "config.h"
  49. #endif
  50. #ifdef HAVE_SYS_UTSNAME_H
  51. #include <sys/utsname.h>
  52. #endif
  53. #ifdef HAVE_GETPWUID_R
  54. #include <pwd.h>
  55. #endif
  56. #define IF_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) {x;}
  57. const int ZOOKEEPER_WRITE = 1 << 0;
  58. const int ZOOKEEPER_READ = 1 << 1;
  59. const int ZOO_EPHEMERAL = 1 << 0;
  60. const int ZOO_SEQUENCE = 1 << 1;
  61. const int ZOO_EXPIRED_SESSION_STATE = EXPIRED_SESSION_STATE_DEF;
  62. const int ZOO_AUTH_FAILED_STATE = AUTH_FAILED_STATE_DEF;
  63. const int ZOO_CONNECTING_STATE = CONNECTING_STATE_DEF;
  64. const int ZOO_ASSOCIATING_STATE = ASSOCIATING_STATE_DEF;
  65. const int ZOO_CONNECTED_STATE = CONNECTED_STATE_DEF;
  66. const int ZOO_NOTCONNECTED_STATE = NOTCONNECTED_STATE_DEF;
  67. static __attribute__ ((unused)) const char* state2String(int state){
  68. switch(state){
  69. case 0:
  70. return "ZOO_CLOSED_STATE";
  71. case CONNECTING_STATE_DEF:
  72. return "ZOO_CONNECTING_STATE";
  73. case ASSOCIATING_STATE_DEF:
  74. return "ZOO_ASSOCIATING_STATE";
  75. case CONNECTED_STATE_DEF:
  76. return "ZOO_CONNECTED_STATE";
  77. case EXPIRED_SESSION_STATE_DEF:
  78. return "ZOO_EXPIRED_SESSION_STATE";
  79. case AUTH_FAILED_STATE_DEF:
  80. return "ZOO_AUTH_FAILED_STATE";
  81. }
  82. return "INVALID_STATE";
  83. }
  84. const int ZOO_CREATED_EVENT = CREATED_EVENT_DEF;
  85. const int ZOO_DELETED_EVENT = DELETED_EVENT_DEF;
  86. const int ZOO_CHANGED_EVENT = CHANGED_EVENT_DEF;
  87. const int ZOO_CHILD_EVENT = CHILD_EVENT_DEF;
  88. const int ZOO_SESSION_EVENT = SESSION_EVENT_DEF;
  89. const int ZOO_NOTWATCHING_EVENT = NOTWATCHING_EVENT_DEF;
  90. static __attribute__ ((unused)) const char* watcherEvent2String(int ev){
  91. switch(ev){
  92. case 0:
  93. return "ZOO_ERROR_EVENT";
  94. case CREATED_EVENT_DEF:
  95. return "ZOO_CREATED_EVENT";
  96. case DELETED_EVENT_DEF:
  97. return "ZOO_DELETED_EVENT";
  98. case CHANGED_EVENT_DEF:
  99. return "ZOO_CHANGED_EVENT";
  100. case CHILD_EVENT_DEF:
  101. return "ZOO_CHILD_EVENT";
  102. case SESSION_EVENT_DEF:
  103. return "ZOO_SESSION_EVENT";
  104. case NOTWATCHING_EVENT_DEF:
  105. return "ZOO_NOTWATCHING_EVENT";
  106. }
  107. return "INVALID_EVENT";
  108. }
  109. const int ZOO_PERM_READ = 1 << 0;
  110. const int ZOO_PERM_WRITE = 1 << 1;
  111. const int ZOO_PERM_CREATE = 1 << 2;
  112. const int ZOO_PERM_DELETE = 1 << 3;
  113. const int ZOO_PERM_ADMIN = 1 << 4;
  114. const int ZOO_PERM_ALL = 0x1f;
  115. struct Id ZOO_ANYONE_ID_UNSAFE = {"world", "anyone"};
  116. struct Id ZOO_AUTH_IDS = {"auth", ""};
  117. static struct ACL _OPEN_ACL_UNSAFE_ACL[] = {{0x1f, {"world", "anyone"}}};
  118. static struct ACL _READ_ACL_UNSAFE_ACL[] = {{0x01, {"world", "anyone"}}};
  119. static struct ACL _CREATOR_ALL_ACL_ACL[] = {{0x1f, {"auth", ""}}};
  120. struct ACL_vector ZOO_OPEN_ACL_UNSAFE = { 1, _OPEN_ACL_UNSAFE_ACL};
  121. struct ACL_vector ZOO_READ_ACL_UNSAFE = { 1, _READ_ACL_UNSAFE_ACL};
  122. struct ACL_vector ZOO_CREATOR_ALL_ACL = { 1, _CREATOR_ALL_ACL_ACL};
  123. #define COMPLETION_WATCH -1
  124. #define COMPLETION_VOID 0
  125. #define COMPLETION_STAT 1
  126. #define COMPLETION_DATA 2
  127. #define COMPLETION_STRINGLIST 3
  128. #define COMPLETION_STRINGLIST_STAT 4
  129. #define COMPLETION_ACLLIST 5
  130. #define COMPLETION_STRING 6
  131. #define COMPLETION_MULTI 7
  132. #define COMPLETION_STRING_STAT 8
  133. typedef struct _auth_completion_list {
  134. void_completion_t completion;
  135. const char *auth_data;
  136. struct _auth_completion_list *next;
  137. } auth_completion_list_t;
  138. typedef struct completion {
  139. int type; /* one of COMPLETION_* values above */
  140. union {
  141. void_completion_t void_result;
  142. stat_completion_t stat_result;
  143. data_completion_t data_result;
  144. strings_completion_t strings_result;
  145. strings_stat_completion_t strings_stat_result;
  146. acl_completion_t acl_result;
  147. string_completion_t string_result;
  148. string_stat_completion_t string_stat_result;
  149. struct watcher_object_list *watcher_result;
  150. };
  151. completion_head_t clist; /* For multi-op */
  152. } completion_t;
  153. typedef struct _completion_list {
  154. int xid;
  155. completion_t c;
  156. const void *data;
  157. buffer_list_t *buffer;
  158. struct _completion_list *next;
  159. watcher_registration_t* watcher;
  160. } completion_list_t;
  161. const char*err2string(int err);
  162. static int queue_session_event(zhandle_t *zh, int state);
  163. static const char* format_endpoint_info(const struct sockaddr_storage* ep);
  164. /* deserialize forward declarations */
  165. static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia);
  166. static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *ia);
  167. /* completion routine forward declarations */
  168. static int add_completion(zhandle_t *zh, int xid, int completion_type,
  169. const void *dc, const void *data, int add_to_front,
  170. watcher_registration_t* wo, completion_head_t *clist);
  171. static completion_list_t* create_completion_entry(int xid, int completion_type,
  172. const void *dc, const void *data, watcher_registration_t* wo,
  173. completion_head_t *clist);
  174. static void destroy_completion_entry(completion_list_t* c);
  175. static void queue_completion_nolock(completion_head_t *list, completion_list_t *c,
  176. int add_to_front);
  177. static void queue_completion(completion_head_t *list, completion_list_t *c,
  178. int add_to_front);
  179. static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
  180. const char* format,...);
  181. static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc);
  182. static int disable_conn_permute=0; // permute enabled by default
  183. static __attribute__((unused)) void print_completion_queue(zhandle_t *zh);
  184. static void *SYNCHRONOUS_MARKER = (void*)&SYNCHRONOUS_MARKER;
  185. static int isValidPath(const char* path, const int flags);
  186. #ifdef _WINDOWS
  187. static int zookeeper_send(SOCKET s, const char* buf, int len)
  188. #else
  189. static ssize_t zookeeper_send(int s, const void* buf, size_t len)
  190. #endif
  191. {
  192. #ifdef __linux__
  193. return send(s, buf, len, MSG_NOSIGNAL);
  194. #else
  195. return send(s, buf, len, 0);
  196. #endif
  197. }
  198. const void *zoo_get_context(zhandle_t *zh)
  199. {
  200. return zh->context;
  201. }
  202. void zoo_set_context(zhandle_t *zh, void *context)
  203. {
  204. if (zh != NULL) {
  205. zh->context = context;
  206. }
  207. }
  208. int zoo_recv_timeout(zhandle_t *zh)
  209. {
  210. return zh->recv_timeout;
  211. }
  212. /** these functions are thread unsafe, so make sure that
  213. zoo_lock_auth is called before you access them **/
  214. static auth_info* get_last_auth(auth_list_head_t *auth_list) {
  215. auth_info *element;
  216. element = auth_list->auth;
  217. if (element == NULL) {
  218. return NULL;
  219. }
  220. while (element->next != NULL) {
  221. element = element->next;
  222. }
  223. return element;
  224. }
  225. static void free_auth_completion(auth_completion_list_t *a_list) {
  226. auth_completion_list_t *tmp, *ftmp;
  227. if (a_list == NULL) {
  228. return;
  229. }
  230. tmp = a_list->next;
  231. while (tmp != NULL) {
  232. ftmp = tmp;
  233. tmp = tmp->next;
  234. ftmp->completion = NULL;
  235. ftmp->auth_data = NULL;
  236. free(ftmp);
  237. }
  238. a_list->completion = NULL;
  239. a_list->auth_data = NULL;
  240. a_list->next = NULL;
  241. return;
  242. }
  243. static void add_auth_completion(auth_completion_list_t* a_list, void_completion_t* completion,
  244. const char *data) {
  245. auth_completion_list_t *element;
  246. auth_completion_list_t *n_element;
  247. element = a_list;
  248. if (a_list->completion == NULL) {
  249. //this is the first element
  250. a_list->completion = *completion;
  251. a_list->next = NULL;
  252. a_list->auth_data = data;
  253. return;
  254. }
  255. while (element->next != NULL) {
  256. element = element->next;
  257. }
  258. n_element = (auth_completion_list_t*) malloc(sizeof(auth_completion_list_t));
  259. n_element->next = NULL;
  260. n_element->completion = *completion;
  261. n_element->auth_data = data;
  262. element->next = n_element;
  263. return;
  264. }
  265. static void get_auth_completions(auth_list_head_t *auth_list, auth_completion_list_t *a_list) {
  266. auth_info *element;
  267. element = auth_list->auth;
  268. if (element == NULL) {
  269. return;
  270. }
  271. while (element) {
  272. if (element->completion) {
  273. add_auth_completion(a_list, &element->completion, element->data);
  274. }
  275. element->completion = NULL;
  276. element = element->next;
  277. }
  278. return;
  279. }
  280. static void add_last_auth(auth_list_head_t *auth_list, auth_info *add_el) {
  281. auth_info *element;
  282. element = auth_list->auth;
  283. if (element == NULL) {
  284. //first element in the list
  285. auth_list->auth = add_el;
  286. return;
  287. }
  288. while (element->next != NULL) {
  289. element = element->next;
  290. }
  291. element->next = add_el;
  292. return;
  293. }
  294. static void init_auth_info(auth_list_head_t *auth_list)
  295. {
  296. auth_list->auth = NULL;
  297. }
  298. static void mark_active_auth(zhandle_t *zh) {
  299. auth_list_head_t auth_h = zh->auth_h;
  300. auth_info *element;
  301. if (auth_h.auth == NULL) {
  302. return;
  303. }
  304. element = auth_h.auth;
  305. while (element != NULL) {
  306. element->state = 1;
  307. element = element->next;
  308. }
  309. }
  310. static void free_auth_info(auth_list_head_t *auth_list)
  311. {
  312. auth_info *auth = auth_list->auth;
  313. while (auth != NULL) {
  314. auth_info* old_auth = NULL;
  315. if(auth->scheme!=NULL)
  316. free(auth->scheme);
  317. deallocate_Buffer(&auth->auth);
  318. old_auth = auth;
  319. auth = auth->next;
  320. free(old_auth);
  321. }
  322. init_auth_info(auth_list);
  323. }
  324. int is_unrecoverable(zhandle_t *zh)
  325. {
  326. return (zh->state<0)? ZINVALIDSTATE: ZOK;
  327. }
  328. zk_hashtable *exists_result_checker(zhandle_t *zh, int rc)
  329. {
  330. if (rc == ZOK) {
  331. return zh->active_node_watchers;
  332. } else if (rc == ZNONODE) {
  333. return zh->active_exist_watchers;
  334. }
  335. return 0;
  336. }
  337. zk_hashtable *data_result_checker(zhandle_t *zh, int rc)
  338. {
  339. return rc==ZOK ? zh->active_node_watchers : 0;
  340. }
  341. zk_hashtable *child_result_checker(zhandle_t *zh, int rc)
  342. {
  343. return rc==ZOK ? zh->active_child_watchers : 0;
  344. }
  345. /**
  346. * Frees and closes everything associated with a handle,
  347. * including the handle itself.
  348. */
  349. static void destroy(zhandle_t *zh)
  350. {
  351. if (zh == NULL) {
  352. return;
  353. }
  354. /* call any outstanding completions with a special error code */
  355. cleanup_bufs(zh,1,ZCLOSING);
  356. if (zh->hostname != 0) {
  357. free(zh->hostname);
  358. zh->hostname = NULL;
  359. }
  360. if (zh->fd != -1) {
  361. close(zh->fd);
  362. zh->fd = -1;
  363. memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
  364. zh->state = 0;
  365. }
  366. addrvec_free(&zh->addrs);
  367. if (zh->chroot != NULL) {
  368. free(zh->chroot);
  369. zh->chroot = NULL;
  370. }
  371. free_auth_info(&zh->auth_h);
  372. destroy_zk_hashtable(zh->active_node_watchers);
  373. destroy_zk_hashtable(zh->active_exist_watchers);
  374. destroy_zk_hashtable(zh->active_child_watchers);
  375. }
  376. static void setup_random()
  377. {
  378. #ifndef WIN32 // TODO: better seed
  379. int seed;
  380. int fd = open("/dev/urandom", O_RDONLY);
  381. if (fd == -1) {
  382. seed = getpid();
  383. } else {
  384. int rc = read(fd, &seed, sizeof(seed));
  385. assert(rc == sizeof(seed));
  386. close(fd);
  387. }
  388. srandom(seed);
  389. srand48(seed);
  390. #endif
  391. }
  392. #ifndef __CYGWIN__
  393. /**
  394. * get the errno from the return code
  395. * of get addrinfo. Errno is not set
  396. * with the call to getaddrinfo, so thats
  397. * why we have to do this.
  398. */
  399. static int getaddrinfo_errno(int rc) {
  400. switch(rc) {
  401. case EAI_NONAME:
  402. // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
  403. #if defined EAI_NODATA && EAI_NODATA != EAI_NONAME
  404. case EAI_NODATA:
  405. #endif
  406. return ENOENT;
  407. case EAI_MEMORY:
  408. return ENOMEM;
  409. default:
  410. return EINVAL;
  411. }
  412. }
  413. #endif
  414. /**
  415. * Count the number of hosts in the connection host string. This assumes it's
  416. * a well-formed connection string whereby each host is separated by a comma.
  417. */
  418. static int count_hosts(char *hosts)
  419. {
  420. if (!hosts || strlen(hosts) == 0) {
  421. return 0;
  422. }
  423. uint32_t count = 0;
  424. char *loc = hosts;
  425. while ((loc = strchr(loc, ','))) {
  426. count++;
  427. loc+=1;
  428. }
  429. return count+1;
  430. }
  431. /**
  432. * Resolve hosts and populate provided address vector with shuffled results.
  433. * The contents of the provided address vector will be initialized to an
  434. * empty state.
  435. */
  436. int resolve_hosts(const char *hosts_in, addrvec_t *avec)
  437. {
  438. int rc = ZOK;
  439. if (hosts_in == NULL || avec == NULL) {
  440. return ZBADARGUMENTS;
  441. }
  442. // initialize address vector
  443. addrvec_init(avec);
  444. char *hosts = strdup(hosts_in);
  445. if (hosts == NULL) {
  446. LOG_ERROR(("out of memory"));
  447. errno=ENOMEM;
  448. rc=ZSYSTEMERROR;
  449. goto fail;
  450. }
  451. int num_hosts = count_hosts(hosts);
  452. if (num_hosts == 0) {
  453. free(hosts);
  454. return ZOK;
  455. }
  456. // Allocate list inside avec
  457. rc = addrvec_alloc_capacity(avec, num_hosts);
  458. if (rc != 0) {
  459. LOG_ERROR(("out of memory"));
  460. errno=ENOMEM;
  461. rc=ZSYSTEMERROR;
  462. goto fail;
  463. }
  464. char *strtok_last;
  465. char * host = strtok_r(hosts, ",", &strtok_last);
  466. while(host) {
  467. char *port_spec = strrchr(host, ':');
  468. char *end_port_spec;
  469. int port;
  470. if (!port_spec) {
  471. LOG_ERROR(("no port in %s", host));
  472. errno=EINVAL;
  473. rc=ZBADARGUMENTS;
  474. goto fail;
  475. }
  476. *port_spec = '\0';
  477. port_spec++;
  478. port = strtol(port_spec, &end_port_spec, 0);
  479. if (!*port_spec || *end_port_spec || port == 0) {
  480. LOG_ERROR(("invalid port in %s", host));
  481. errno=EINVAL;
  482. rc=ZBADARGUMENTS;
  483. goto fail;
  484. }
  485. #if defined(__CYGWIN__)
  486. // sadly CYGWIN doesn't have getaddrinfo
  487. // but happily gethostbyname is threadsafe in windows
  488. {
  489. struct hostent *he;
  490. char **ptr;
  491. struct sockaddr_in *addr4;
  492. he = gethostbyname(host);
  493. if (!he) {
  494. LOG_ERROR(("could not resolve %s", host));
  495. errno=ENOENT;
  496. rc=ZBADARGUMENTS;
  497. goto fail;
  498. }
  499. // Setup the address array
  500. for(ptr = he->h_addr_list;*ptr != 0; ptr++) {
  501. if (addrs->count == addrs->capacity) {
  502. rc = addrvec_grow_default(addrs);
  503. if (rc != 0) {
  504. LOG_ERROR(("out of memory"));
  505. errno=ENOMEM;
  506. rc=ZSYSTEMERROR;
  507. goto fail;
  508. }
  509. }
  510. addr = &addrs->list[addrs->count];
  511. addr4 = (struct sockaddr_in*)addr;
  512. addr->ss_family = he->h_addrtype;
  513. if (addr->ss_family == AF_INET) {
  514. addr4->sin_port = htons(port);
  515. memset(&addr4->sin_zero, 0, sizeof(addr4->sin_zero));
  516. memcpy(&addr4->sin_addr, *ptr, he->h_length);
  517. zh->addrs.count++;
  518. }
  519. #if defined(AF_INET6)
  520. else if (addr->ss_family == AF_INET6) {
  521. struct sockaddr_in6 *addr6;
  522. addr6 = (struct sockaddr_in6*)addr;
  523. addr6->sin6_port = htons(port);
  524. addr6->sin6_scope_id = 0;
  525. addr6->sin6_flowinfo = 0;
  526. memcpy(&addr6->sin6_addr, *ptr, he->h_length);
  527. zh->addrs.count++;
  528. }
  529. #endif
  530. else {
  531. LOG_WARN(("skipping unknown address family %x for %s",
  532. addr->ss_family, hosts_in));
  533. }
  534. }
  535. host = strtok_r(0, ",", &strtok_last);
  536. }
  537. #else
  538. {
  539. struct addrinfo hints, *res, *res0;
  540. memset(&hints, 0, sizeof(hints));
  541. #ifdef AI_ADDRCONFIG
  542. hints.ai_flags = AI_ADDRCONFIG;
  543. #else
  544. hints.ai_flags = 0;
  545. #endif
  546. hints.ai_family = AF_UNSPEC;
  547. hints.ai_socktype = SOCK_STREAM;
  548. hints.ai_protocol = IPPROTO_TCP;
  549. while(isspace(*host) && host != strtok_last)
  550. host++;
  551. if ((rc = getaddrinfo(host, port_spec, &hints, &res0)) != 0) {
  552. //bug in getaddrinfo implementation when it returns
  553. //EAI_BADFLAGS or EAI_ADDRFAMILY with AF_UNSPEC and
  554. // ai_flags as AI_ADDRCONFIG
  555. #ifdef AI_ADDRCONFIG
  556. if ((hints.ai_flags == AI_ADDRCONFIG) &&
  557. // ZOOKEEPER-1323 EAI_NODATA and EAI_ADDRFAMILY are deprecated in FreeBSD.
  558. #ifdef EAI_ADDRFAMILY
  559. ((rc ==EAI_BADFLAGS) || (rc == EAI_ADDRFAMILY))) {
  560. #else
  561. (rc == EAI_BADFLAGS)) {
  562. #endif
  563. //reset ai_flags to null
  564. hints.ai_flags = 0;
  565. //retry getaddrinfo
  566. rc = getaddrinfo(host, port_spec, &hints, &res0);
  567. }
  568. #endif
  569. if (rc != 0) {
  570. errno = getaddrinfo_errno(rc);
  571. #ifdef WIN32
  572. LOG_ERROR(("Win32 message: %s\n", gai_strerror(rc)));
  573. #elif __linux__ && __GNUC__
  574. LOG_ERROR(("getaddrinfo: %s\n", gai_strerror(rc)));
  575. #else
  576. LOG_ERROR(("getaddrinfo: %s\n", strerror(errno)));
  577. #endif
  578. rc=ZSYSTEMERROR;
  579. goto fail;
  580. }
  581. }
  582. for (res = res0; res; res = res->ai_next) {
  583. // Expand address list if needed
  584. if (avec->count == avec->capacity) {
  585. rc = addrvec_grow_default(avec);
  586. if (rc != 0) {
  587. LOG_ERROR(("out of memory"));
  588. errno=ENOMEM;
  589. rc=ZSYSTEMERROR;
  590. goto fail;
  591. }
  592. }
  593. // Copy addrinfo into address list
  594. switch (res->ai_family) {
  595. case AF_INET:
  596. #if defined(AF_INET6)
  597. case AF_INET6:
  598. #endif
  599. addrvec_append_addrinfo(avec, res);
  600. break;
  601. default:
  602. LOG_WARN(("skipping unknown address family %x for %s",
  603. res->ai_family, hosts_in));
  604. break;
  605. }
  606. }
  607. freeaddrinfo(res0);
  608. host = strtok_r(0, ",", &strtok_last);
  609. }
  610. #endif
  611. }
  612. free(hosts);
  613. if(!disable_conn_permute){
  614. setup_random();
  615. addrvec_shuffle(avec);
  616. }
  617. return ZOK;
  618. fail:
  619. addrvec_free(avec);
  620. if (hosts) {
  621. free(hosts);
  622. hosts = NULL;
  623. }
  624. return rc;
  625. }
  626. /**
  627. * Updates the list of servers and determine if changing connections is necessary.
  628. * Permutes server list for proper load balancing.
  629. *
  630. * Changing connections is necessary if one of the following holds:
  631. * a) the server this client is currently connected is not in new address list.
  632. * Otherwise (if currentHost is in the new list):
  633. * b) the number of servers in the cluster is increasing - in this case the load
  634. * on currentHost should decrease, which means that SOME of the clients
  635. * connected to it will migrate to the new servers. The decision whether this
  636. * client migrates or not is probabilistic so that the expected number of
  637. * clients connected to each server is the same.
  638. *
  639. * If reconfig is set to true, the function sets pOld and pNew that correspond
  640. * to the probability to migrate to ones of the new servers or one of the old
  641. * servers (migrating to one of the old servers is done only if our client's
  642. * currentHost is not in new list).
  643. *
  644. * See zoo_cycle_next_server for the selection logic.
  645. *
  646. * See {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1355} for the
  647. * protocol and its evaluation,
  648. */
  649. int update_addrs(zhandle_t *zh)
  650. {
  651. // Verify we have a valid handle
  652. if (zh == NULL) {
  653. return ZBADARGUMENTS;
  654. }
  655. // zh->hostname should always be set
  656. if (zh->hostname == NULL)
  657. {
  658. return ZSYSTEMERROR;
  659. }
  660. // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
  661. lock_reconfig(zh);
  662. int rc = ZOK;
  663. char *hosts = NULL;
  664. // Copy zh->hostname for local use
  665. hosts = strdup(zh->hostname);
  666. if (hosts == NULL) {
  667. rc = ZSYSTEMERROR;
  668. goto fail;
  669. }
  670. addrvec_t resolved = { 0 };
  671. rc = resolve_hosts(hosts, &resolved);
  672. if (rc != ZOK)
  673. {
  674. goto fail;
  675. }
  676. // If the addrvec list is identical to last time we ran don't do anything
  677. if (addrvec_eq(&zh->addrs, &resolved))
  678. {
  679. goto fail;
  680. }
  681. // Is the server we're connected to in the new resolved list?
  682. int found_current = addrvec_contains(&resolved, &zh->addr_cur);
  683. // Clear out old and new address lists
  684. zh->reconfig = 1;
  685. addrvec_free(&zh->addrs_old);
  686. addrvec_free(&zh->addrs_new);
  687. // Divide server list into addrs_old if in previous list and addrs_new if not
  688. int i = 0;
  689. for (i = 0; i < resolved.count; i++)
  690. {
  691. struct sockaddr_storage *resolved_address = &resolved.data[i];
  692. if (addrvec_contains(&zh->addrs, resolved_address))
  693. {
  694. rc = addrvec_append(&zh->addrs_old, resolved_address);
  695. if (rc != ZOK)
  696. {
  697. goto fail;
  698. }
  699. }
  700. else {
  701. rc = addrvec_append(&zh->addrs_new, resolved_address);
  702. if (rc != ZOK)
  703. {
  704. goto fail;
  705. }
  706. }
  707. }
  708. int num_old = zh->addrs_old.count;
  709. int num_new = zh->addrs_new.count;
  710. // Number of servers increased
  711. if (num_old + num_new > zh->addrs.count)
  712. {
  713. if (found_current) {
  714. // my server is in the new config, but load should be decreased.
  715. // Need to decide if the client is moving to one of the new servers
  716. if (drand48() <= (1 - ((double)zh->addrs.count) / (num_old + num_new))) {
  717. zh->pNew = 1;
  718. zh->pOld = 0;
  719. } else {
  720. // do nothing special -- stay with the current server
  721. zh->reconfig = 0;
  722. }
  723. } else {
  724. // my server is not in the new config, and load on old servers must
  725. // be decreased, so connect to one of the new servers
  726. zh->pNew = 1;
  727. zh->pOld = 0;
  728. }
  729. }
  730. // Number of servers stayed the same or decreased
  731. else {
  732. if (found_current) {
  733. // my server is in the new config, and load should be increased, so
  734. // stay with this server and do nothing special
  735. zh->reconfig = 0;
  736. } else {
  737. zh->pOld = ((double) (num_old * (zh->addrs.count - (num_old + num_new)))) / ((num_old + num_new) * (zh->addrs.count - num_old));
  738. zh->pNew = 1 - zh->pOld;
  739. }
  740. }
  741. addrvec_free(&zh->addrs);
  742. zh->addrs = resolved;
  743. // If we need to do a reconfig and we're currently connected to a server,
  744. // then force close that connection so on next interest() call we'll make a
  745. // new connection
  746. if (zh->reconfig == 1 && zh->fd != -1)
  747. {
  748. close(zh->fd);
  749. zh->fd = -1;
  750. zh->state = ZOO_NOTCONNECTED_STATE;
  751. }
  752. fail:
  753. unlock_reconfig(zh);
  754. // If we short-circuited out and never assigned resolved to zh->addrs then we
  755. // need to free resolved to avoid a memleak.
  756. if (zh->addrs.data != resolved.data)
  757. {
  758. addrvec_free(&resolved);
  759. }
  760. if (hosts) {
  761. free(hosts);
  762. hosts = NULL;
  763. }
  764. return rc;
  765. }
  766. const clientid_t *zoo_client_id(zhandle_t *zh)
  767. {
  768. return &zh->client_id;
  769. }
  770. static void null_watcher_fn(zhandle_t* p1, int p2, int p3,const char* p4,void*p5){}
  771. watcher_fn zoo_set_watcher(zhandle_t *zh,watcher_fn newFn)
  772. {
  773. watcher_fn oldWatcher=zh->watcher;
  774. if (newFn) {
  775. zh->watcher = newFn;
  776. } else {
  777. zh->watcher = null_watcher_fn;
  778. }
  779. return oldWatcher;
  780. }
  781. struct sockaddr* zookeeper_get_connected_host(zhandle_t *zh,
  782. struct sockaddr *addr, socklen_t *addr_len)
  783. {
  784. if (zh->state!=ZOO_CONNECTED_STATE) {
  785. return NULL;
  786. }
  787. if (getpeername(zh->fd, addr, addr_len)==-1) {
  788. return NULL;
  789. }
  790. return addr;
  791. }
  792. static void log_env() {
  793. char buf[2048];
  794. #ifdef HAVE_SYS_UTSNAME_H
  795. struct utsname utsname;
  796. #endif
  797. #if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
  798. struct passwd pw;
  799. struct passwd *pwp = NULL;
  800. uid_t uid = 0;
  801. #endif
  802. LOG_INFO(("Client environment:zookeeper.version=%s", PACKAGE_STRING));
  803. #ifdef HAVE_GETHOSTNAME
  804. gethostname(buf, sizeof(buf));
  805. LOG_INFO(("Client environment:host.name=%s", buf));
  806. #else
  807. LOG_INFO(("Client environment:host.name=<not implemented>"));
  808. #endif
  809. #ifdef HAVE_SYS_UTSNAME_H
  810. uname(&utsname);
  811. LOG_INFO(("Client environment:os.name=%s", utsname.sysname));
  812. LOG_INFO(("Client environment:os.arch=%s", utsname.release));
  813. LOG_INFO(("Client environment:os.version=%s", utsname.version));
  814. #else
  815. LOG_INFO(("Client environment:os.name=<not implemented>"));
  816. LOG_INFO(("Client environment:os.arch=<not implemented>"));
  817. LOG_INFO(("Client environment:os.version=<not implemented>"));
  818. #endif
  819. #ifdef HAVE_GETLOGIN
  820. LOG_INFO(("Client environment:user.name=%s", getlogin()));
  821. #else
  822. LOG_INFO(("Client environment:user.name=<not implemented>"));
  823. #endif
  824. #if defined(HAVE_GETUID) && defined(HAVE_GETPWUID_R)
  825. uid = getuid();
  826. if (!getpwuid_r(uid, &pw, buf, sizeof(buf), &pwp) && pwp) {
  827. LOG_INFO(("Client environment:user.home=%s", pw.pw_dir));
  828. } else {
  829. LOG_INFO(("Client environment:user.home=<NA>"));
  830. }
  831. #else
  832. LOG_INFO(("Client environment:user.home=<not implemented>"));
  833. #endif
  834. #ifdef HAVE_GETCWD
  835. if (!getcwd(buf, sizeof(buf))) {
  836. LOG_INFO(("Client environment:user.dir=<toolong>"));
  837. } else {
  838. LOG_INFO(("Client environment:user.dir=%s", buf));
  839. }
  840. #else
  841. LOG_INFO(("Client environment:user.dir=<not implemented>"));
  842. #endif
  843. }
  844. /**
  845. * Create a zookeeper handle associated with the given host and port.
  846. */
  847. zhandle_t *zookeeper_init(const char *host, watcher_fn watcher,
  848. int recv_timeout, const clientid_t *clientid, void *context, int flags)
  849. {
  850. int errnosave = 0;
  851. zhandle_t *zh = NULL;
  852. char *index_chroot = NULL;
  853. log_env();
  854. #ifdef WIN32
  855. if (Win32WSAStartup()){
  856. LOG_ERROR(("Error initializing ws2_32.dll"));
  857. return 0;
  858. }
  859. #endif
  860. LOG_INFO(("Initiating client connection, host=%s sessionTimeout=%d watcher=%p"
  861. " sessionId=%#llx sessionPasswd=%s context=%p flags=%d",
  862. host,
  863. recv_timeout,
  864. watcher,
  865. (clientid == 0 ? 0 : clientid->client_id),
  866. ((clientid == 0) || (clientid->passwd[0] == 0) ?
  867. "<null>" : "<hidden>"),
  868. context,
  869. flags));
  870. zh = calloc(1, sizeof(*zh));
  871. if (!zh) {
  872. return 0;
  873. }
  874. zh->hostname = NULL;
  875. zh->fd = -1;
  876. zh->state = ZOO_NOTCONNECTED_STATE;
  877. zh->context = context;
  878. zh->recv_timeout = recv_timeout;
  879. init_auth_info(&zh->auth_h);
  880. if (watcher) {
  881. zh->watcher = watcher;
  882. } else {
  883. zh->watcher = null_watcher_fn;
  884. }
  885. if (host == 0 || *host == 0) { // what we shouldn't dup
  886. errno=EINVAL;
  887. goto abort;
  888. }
  889. //parse the host to get the chroot if available
  890. index_chroot = strchr(host, '/');
  891. if (index_chroot) {
  892. zh->chroot = strdup(index_chroot);
  893. if (zh->chroot == NULL) {
  894. goto abort;
  895. }
  896. // if chroot is just / set it to null
  897. if (strlen(zh->chroot) == 1) {
  898. free(zh->chroot);
  899. zh->chroot = NULL;
  900. }
  901. // cannot use strndup so allocate and strcpy
  902. zh->hostname = (char *) malloc(index_chroot - host + 1);
  903. zh->hostname = strncpy(zh->hostname, host, (index_chroot - host));
  904. //strncpy does not null terminate
  905. *(zh->hostname + (index_chroot - host)) = '\0';
  906. } else {
  907. zh->chroot = NULL;
  908. zh->hostname = strdup(host);
  909. }
  910. if (zh->chroot && !isValidPath(zh->chroot, 0)) {
  911. errno = EINVAL;
  912. goto abort;
  913. }
  914. if (zh->hostname == 0) {
  915. goto abort;
  916. }
  917. if(update_addrs(zh) != 0) {
  918. goto abort;
  919. }
  920. if (clientid) {
  921. memcpy(&zh->client_id, clientid, sizeof(zh->client_id));
  922. } else {
  923. memset(&zh->client_id, 0, sizeof(zh->client_id));
  924. }
  925. zh->primer_buffer.buffer = zh->primer_storage_buffer;
  926. zh->primer_buffer.curr_offset = 0;
  927. zh->primer_buffer.len = sizeof(zh->primer_storage_buffer);
  928. zh->primer_buffer.next = 0;
  929. zh->last_zxid = 0;
  930. zh->next_deadline.tv_sec=zh->next_deadline.tv_usec=0;
  931. zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
  932. zh->active_node_watchers=create_zk_hashtable();
  933. zh->active_exist_watchers=create_zk_hashtable();
  934. zh->active_child_watchers=create_zk_hashtable();
  935. if (adaptor_init(zh) == -1) {
  936. goto abort;
  937. }
  938. return zh;
  939. abort:
  940. errnosave=errno;
  941. destroy(zh);
  942. free(zh);
  943. errno=errnosave;
  944. return 0;
  945. }
  946. /**
  947. * Set a new list of zk servers to connect to. Disconnect will occur if
  948. * current connection endpoint is not in the list.
  949. */
  950. int zoo_set_servers(zhandle_t *zh, const char *hosts)
  951. {
  952. if (hosts == NULL)
  953. {
  954. LOG_ERROR(("New server list cannot be empty"));
  955. return ZBADARGUMENTS;
  956. }
  957. // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
  958. lock_reconfig(zh);
  959. // Reset hostname to new set of hosts to connect to
  960. if (zh->hostname) {
  961. free(zh->hostname);
  962. }
  963. zh->hostname = strdup(hosts);
  964. unlock_reconfig(zh);
  965. return update_addrs(zh);
  966. }
  967. /**
  968. * Get the next server to connect to, when in 'reconfig' mode, which means that
  969. * we've updated the server list to connect to, and are now trying to find some
  970. * server to connect to. Once we get successfully connected, 'reconfig' mode is
  971. * set to false. Similarly, if we tried to connect to all servers in new config
  972. * and failed, 'reconfig' mode is set to false.
  973. *
  974. * While in 'reconfig' mode, we should connect to a server in the new set of
  975. * servers (addrs_new) with probability pNew and to servers in the old set of
  976. * servers (addrs_old) with probability pOld (which is just 1-pNew). If we tried
  977. * out all servers in either, we continue to try servers from the other set,
  978. * regardless of pNew or pOld. If we tried all servers we give up and go back to
  979. * the normal round robin mode
  980. *
  981. * When called, must be protected by lock_reconfig(zh).
  982. */
  983. static int get_next_server_in_reconfig(zhandle_t *zh)
  984. {
  985. int take_new = drand48() <= zh->pNew;
  986. LOG_DEBUG(("[OLD] count=%d capacity=%d next=%d hasnext=%d",
  987. zh->addrs_old.count, zh->addrs_old.capacity, zh->addrs_old.next,
  988. addrvec_hasnext(&zh->addrs_old)));
  989. LOG_DEBUG(("[NEW] count=%d capacity=%d next=%d hasnext=%d",
  990. zh->addrs_new.count, zh->addrs_new.capacity, zh->addrs_new.next,
  991. addrvec_hasnext(&zh->addrs_new)));
  992. // Take one of the new servers if we haven't tried them all yet
  993. // and either the probability tells us to connect to one of the new servers
  994. // or if we already tried them all then use one of the old servers
  995. if (addrvec_hasnext(&zh->addrs_new)
  996. && (take_new || !addrvec_hasnext(&zh->addrs_old)))
  997. {
  998. addrvec_next(&zh->addrs_new, &zh->addr_cur);
  999. LOG_DEBUG(("Using next from NEW=%s", format_endpoint_info(&zh->addr_cur)));
  1000. return 0;
  1001. }
  1002. // start taking old servers
  1003. if (addrvec_hasnext(&zh->addrs_old)) {
  1004. addrvec_next(&zh->addrs_old, &zh->addr_cur);
  1005. LOG_DEBUG(("Using next from OLD=%s", format_endpoint_info(&zh->addr_cur)));
  1006. return 0;
  1007. }
  1008. LOG_DEBUG(("Failed to find either new or old"));
  1009. memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
  1010. return 1;
  1011. }
  1012. /**
  1013. * Cycle through our server list to the correct 'next' server. The 'next' server
  1014. * to connect to depends upon whether we're in a 'reconfig' mode or not. Reconfig
  1015. * mode means we've upated the server list and are now trying to find a server
  1016. * to connect to. Once we get connected, we are no longer in the reconfig mode.
  1017. * Similarly, if we try to connect to all the servers in the new configuration
  1018. * and failed, reconfig mode is set to false.
  1019. *
  1020. * For more algorithm details, see get_next_server_in_reconfig.
  1021. */
  1022. void zoo_cycle_next_server(zhandle_t *zh)
  1023. {
  1024. // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
  1025. lock_reconfig(zh);
  1026. memset(&zh->addr_cur, 0, sizeof(zh->addr_cur));
  1027. if (zh->reconfig)
  1028. {
  1029. if (get_next_server_in_reconfig(zh) == 0) {
  1030. unlock_reconfig(zh);
  1031. return;
  1032. }
  1033. // tried all new and old servers and couldn't connect
  1034. zh->reconfig = 0;
  1035. }
  1036. addrvec_next(&zh->addrs, &zh->addr_cur);
  1037. unlock_reconfig(zh);
  1038. return;
  1039. }
  1040. /**
  1041. * Get the host:port for the server we are currently connecting to or connected
  1042. * to. This is largely for testing purposes but is also generally useful for
  1043. * other client software built on top of this client.
  1044. */
  1045. const char* zoo_get_current_server(zhandle_t* zh)
  1046. {
  1047. // NOTE: guard access to {hostname, addr_cur, addrs, addrs_old, addrs_new}
  1048. // Need the lock here as it is changed in update_addrs()
  1049. lock_reconfig(zh);
  1050. const char * endpoint_info = format_endpoint_info(&zh->addr_cur);
  1051. unlock_reconfig(zh);
  1052. return endpoint_info;
  1053. }
  1054. /**
  1055. * deallocated the free_path only its beeen allocated
  1056. * and not equal to path
  1057. */
  1058. void free_duplicate_path(const char *free_path, const char* path) {
  1059. if (free_path != path) {
  1060. free((void*)free_path);
  1061. }
  1062. }
  1063. /**
  1064. prepend the chroot path if available else return the path
  1065. */
  1066. static char* prepend_string(zhandle_t *zh, const char* client_path) {
  1067. char *ret_str;
  1068. if (zh == NULL || zh->chroot == NULL)
  1069. return (char *) client_path;
  1070. // handle the chroot itself, client_path = "/"
  1071. if (strlen(client_path) == 1) {
  1072. return strdup(zh->chroot);
  1073. }
  1074. ret_str = (char *) malloc(strlen(zh->chroot) + strlen(client_path) + 1);
  1075. strcpy(ret_str, zh->chroot);
  1076. return strcat(ret_str, client_path);
  1077. }
  1078. /**
  1079. strip off the chroot string from the server path
  1080. if there is one else return the exact path
  1081. */
  1082. char* sub_string(zhandle_t *zh, const char* server_path) {
  1083. char *ret_str;
  1084. if (zh->chroot == NULL)
  1085. return (char *) server_path;
  1086. //ZOOKEEPER-1027
  1087. if (strncmp(server_path, zh->chroot, strlen(zh->chroot)) != 0) {
  1088. LOG_ERROR(("server path %s does not include chroot path %s",
  1089. server_path, zh->chroot));
  1090. return (char *) server_path;
  1091. }
  1092. if (strlen(server_path) == strlen(zh->chroot)) {
  1093. //return "/"
  1094. ret_str = strdup("/");
  1095. return ret_str;
  1096. }
  1097. ret_str = strdup(server_path + strlen(zh->chroot));
  1098. return ret_str;
  1099. }
  1100. static buffer_list_t *allocate_buffer(char *buff, int len)
  1101. {
  1102. buffer_list_t *buffer = calloc(1, sizeof(*buffer));
  1103. if (buffer == 0)
  1104. return 0;
  1105. buffer->len = len==0?sizeof(*buffer):len;
  1106. buffer->curr_offset = 0;
  1107. buffer->buffer = buff;
  1108. buffer->next = 0;
  1109. return buffer;
  1110. }
  1111. static void free_buffer(buffer_list_t *b)
  1112. {
  1113. if (!b) {
  1114. return;
  1115. }
  1116. if (b->buffer) {
  1117. free(b->buffer);
  1118. }
  1119. free(b);
  1120. }
  1121. static buffer_list_t *dequeue_buffer(buffer_head_t *list)
  1122. {
  1123. buffer_list_t *b;
  1124. lock_buffer_list(list);
  1125. b = list->head;
  1126. if (b) {
  1127. list->head = b->next;
  1128. if (!list->head) {
  1129. assert(b == list->last);
  1130. list->last = 0;
  1131. }
  1132. }
  1133. unlock_buffer_list(list);
  1134. return b;
  1135. }
  1136. static int remove_buffer(buffer_head_t *list)
  1137. {
  1138. buffer_list_t *b = dequeue_buffer(list);
  1139. if (!b) {
  1140. return 0;
  1141. }
  1142. free_buffer(b);
  1143. return 1;
  1144. }
  1145. static void queue_buffer(buffer_head_t *list, buffer_list_t *b, int add_to_front)
  1146. {
  1147. b->next = 0;
  1148. lock_buffer_list(list);
  1149. if (list->head) {
  1150. assert(list->last);
  1151. // The list is not empty
  1152. if (add_to_front) {
  1153. b->next = list->head;
  1154. list->head = b;
  1155. } else {
  1156. list->last->next = b;
  1157. list->last = b;
  1158. }
  1159. }else{
  1160. // The list is empty
  1161. assert(!list->head);
  1162. list->head = b;
  1163. list->last = b;
  1164. }
  1165. unlock_buffer_list(list);
  1166. }
  1167. static int queue_buffer_bytes(buffer_head_t *list, char *buff, int len)
  1168. {
  1169. buffer_list_t *b = allocate_buffer(buff,len);
  1170. if (!b)
  1171. return ZSYSTEMERROR;
  1172. queue_buffer(list, b, 0);
  1173. return ZOK;
  1174. }
  1175. static int queue_front_buffer_bytes(buffer_head_t *list, char *buff, int len)
  1176. {
  1177. buffer_list_t *b = allocate_buffer(buff,len);
  1178. if (!b)
  1179. return ZSYSTEMERROR;
  1180. queue_buffer(list, b, 1);
  1181. return ZOK;
  1182. }
  1183. static __attribute__ ((unused)) int get_queue_len(buffer_head_t *list)
  1184. {
  1185. int i;
  1186. buffer_list_t *ptr;
  1187. lock_buffer_list(list);
  1188. ptr = list->head;
  1189. for (i=0; ptr!=0; ptr=ptr->next, i++)
  1190. ;
  1191. unlock_buffer_list(list);
  1192. return i;
  1193. }
  1194. /* returns:
  1195. * -1 if send failed,
  1196. * 0 if send would block while sending the buffer (or a send was incomplete),
  1197. * 1 if success
  1198. */
  1199. #ifdef WIN32
  1200. static int send_buffer(SOCKET fd, buffer_list_t *buff)
  1201. #else
  1202. static int send_buffer(int fd, buffer_list_t *buff)
  1203. #endif
  1204. {
  1205. int len = buff->len;
  1206. int off = buff->curr_offset;
  1207. int rc = -1;
  1208. if (off < 4) {
  1209. /* we need to send the length at the beginning */
  1210. int nlen = htonl(len);
  1211. char *b = (char*)&nlen;
  1212. rc = zookeeper_send(fd, b + off, sizeof(nlen) - off);
  1213. if (rc == -1) {
  1214. #ifndef _WINDOWS
  1215. if (errno != EAGAIN) {
  1216. #else
  1217. if (WSAGetLastError() != WSAEWOULDBLOCK) {
  1218. #endif
  1219. return -1;
  1220. } else {
  1221. return 0;
  1222. }
  1223. } else {
  1224. buff->curr_offset += rc;
  1225. }
  1226. off = buff->curr_offset;
  1227. }
  1228. if (off >= 4) {
  1229. /* want off to now represent the offset into the buffer */
  1230. off -= sizeof(buff->len);
  1231. rc = zookeeper_send(fd, buff->buffer + off, len - off);
  1232. if (rc == -1) {
  1233. #ifndef _WINDOWS
  1234. if (errno != EAGAIN) {
  1235. #else
  1236. if (WSAGetLastError() != WSAEWOULDBLOCK) {
  1237. #endif
  1238. return -1;
  1239. }
  1240. } else {
  1241. buff->curr_offset += rc;
  1242. }
  1243. }
  1244. return buff->curr_offset == len + sizeof(buff->len);
  1245. }
  1246. /* returns:
  1247. * -1 if recv call failed,
  1248. * 0 if recv would block,
  1249. * 1 if success
  1250. */
  1251. #ifdef WIN32
  1252. static int recv_buffer(SOCKET fd, buffer_list_t *buff)
  1253. #else
  1254. static int recv_buffer(int fd, buffer_list_t *buff)
  1255. #endif
  1256. {
  1257. int off = buff->curr_offset;
  1258. int rc = 0;
  1259. //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
  1260. /* if buffer is less than 4, we are reading in the length */
  1261. if (off < 4) {
  1262. char *buffer = (char*)&(buff->len);
  1263. rc = recv(fd, buffer+off, sizeof(int)-off, 0);
  1264. //fprintf(LOGSTREAM, "rc = %d, off = %d, line %d\n", rc, off, __LINE__);
  1265. switch(rc) {
  1266. case 0:
  1267. errno = EHOSTDOWN;
  1268. case -1:
  1269. #ifndef _WINDOWS
  1270. if (errno == EAGAIN) {
  1271. #else
  1272. if (WSAGetLastError() == WSAEWOULDBLOCK) {
  1273. #endif
  1274. return 0;
  1275. }
  1276. return -1;
  1277. default:
  1278. buff->curr_offset += rc;
  1279. }
  1280. off = buff->curr_offset;
  1281. if (buff->curr_offset == sizeof(buff->len)) {
  1282. buff->len = ntohl(buff->len);
  1283. buff->buffer = calloc(1, buff->len);
  1284. }
  1285. }
  1286. if (buff->buffer) {
  1287. /* want off to now represent the offset into the buffer */
  1288. off -= sizeof(buff->len);
  1289. rc = recv(fd, buff->buffer+off, buff->len-off, 0);
  1290. switch(rc) {
  1291. case 0:
  1292. errno = EHOSTDOWN;
  1293. case -1:
  1294. #ifndef _WINDOWS
  1295. if (errno == EAGAIN) {
  1296. #else
  1297. if (WSAGetLastError() == WSAEWOULDBLOCK) {
  1298. #endif
  1299. break;
  1300. }
  1301. return -1;
  1302. default:
  1303. buff->curr_offset += rc;
  1304. }
  1305. }
  1306. return buff->curr_offset == buff->len + sizeof(buff->len);
  1307. }
  1308. void free_buffers(buffer_head_t *list)
  1309. {
  1310. while (remove_buffer(list))
  1311. ;
  1312. }
  1313. void free_completions(zhandle_t *zh,int callCompletion,int reason)
  1314. {
  1315. completion_head_t tmp_list;
  1316. struct oarchive *oa;
  1317. struct ReplyHeader h;
  1318. void_completion_t auth_completion = NULL;
  1319. auth_completion_list_t a_list, *a_tmp;
  1320. lock_completion_list(&zh->sent_requests);
  1321. tmp_list = zh->sent_requests;
  1322. zh->sent_requests.head = 0;
  1323. zh->sent_requests.last = 0;
  1324. unlock_completion_list(&zh->sent_requests);
  1325. while (tmp_list.head) {
  1326. completion_list_t *cptr = tmp_list.head;
  1327. tmp_list.head = cptr->next;
  1328. if (cptr->c.data_result == SYNCHRONOUS_MARKER) {
  1329. struct sync_completion
  1330. *sc = (struct sync_completion*)cptr->data;
  1331. sc->rc = reason;
  1332. notify_sync_completion(sc);
  1333. zh->outstanding_sync--;
  1334. destroy_completion_entry(cptr);
  1335. } else if (callCompletion) {
  1336. if(cptr->xid == PING_XID){
  1337. // Nothing to do with a ping response
  1338. destroy_completion_entry(cptr);
  1339. } else {
  1340. // Fake the response
  1341. buffer_list_t *bptr;
  1342. h.xid = cptr->xid;
  1343. h.zxid = -1;
  1344. h.err = reason;
  1345. oa = create_buffer_oarchive();
  1346. serialize_ReplyHeader(oa, "header", &h);
  1347. bptr = calloc(sizeof(*bptr), 1);
  1348. assert(bptr);
  1349. bptr->len = get_buffer_len(oa);
  1350. bptr->buffer = get_buffer(oa);
  1351. close_buffer_oarchive(&oa, 0);
  1352. cptr->buffer = bptr;
  1353. queue_completion(&zh->completions_to_process, cptr, 0);
  1354. }
  1355. }
  1356. }
  1357. a_list.completion = NULL;
  1358. a_list.next = NULL;
  1359. zoo_lock_auth(zh);
  1360. get_auth_completions(&zh->auth_h, &a_list);
  1361. zoo_unlock_auth(zh);
  1362. a_tmp = &a_list;
  1363. // chain call user's completion function
  1364. while (a_tmp->completion != NULL) {
  1365. auth_completion = a_tmp->completion;
  1366. auth_completion(reason, a_tmp->auth_data);
  1367. a_tmp = a_tmp->next;
  1368. if (a_tmp == NULL)
  1369. break;
  1370. }
  1371. free_auth_completion(&a_list);
  1372. }
  1373. static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc)
  1374. {
  1375. enter_critical(zh);
  1376. free_buffers(&zh->to_send);
  1377. free_buffers(&zh->to_process);
  1378. free_completions(zh,callCompletion,rc);
  1379. leave_critical(zh);
  1380. if (zh->input_buffer && zh->input_buffer != &zh->primer_buffer) {
  1381. free_buffer(zh->input_buffer);
  1382. zh->input_buffer = 0;
  1383. }
  1384. }
  1385. static void handle_error(zhandle_t *zh,int rc)
  1386. {
  1387. close(zh->fd);
  1388. if (is_unrecoverable(zh)) {
  1389. LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=%s",
  1390. state2String(zh->state)));
  1391. PROCESS_SESSION_EVENT(zh, zh->state);
  1392. } else if (zh->state == ZOO_CONNECTED_STATE) {
  1393. LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=CONNECTING_STATE"));
  1394. PROCESS_SESSION_EVENT(zh, ZOO_CONNECTING_STATE);
  1395. }
  1396. cleanup_bufs(zh,1,rc);
  1397. zh->fd = -1;
  1398. LOG_DEBUG(("Previous connection=[%s] delay=%d", zoo_get_current_server(zh), zh->delay));
  1399. // NOTE: If we're at the end of the list of addresses to connect to, then
  1400. // we want to delay the next connection attempt to avoid spinning.
  1401. // Then increment what host we'll connect to since we failed to connect to current
  1402. zh->delay = addrvec_atend(&zh->addrs);
  1403. addrvec_next(&zh->addrs, &zh->addr_cur);
  1404. if (!is_unrecoverable(zh)) {
  1405. zh->state = 0;
  1406. }
  1407. if (process_async(zh->outstanding_sync)) {
  1408. process_completions(zh);
  1409. }
  1410. }
  1411. static int handle_socket_error_msg(zhandle_t *zh, int line, int rc,
  1412. const char* format, ...)
  1413. {
  1414. if(logLevel>=ZOO_LOG_LEVEL_ERROR){
  1415. va_list va;
  1416. char buf[1024];
  1417. va_start(va,format);
  1418. vsnprintf(buf, sizeof(buf)-1,format,va);
  1419. log_message(ZOO_LOG_LEVEL_ERROR,line,__func__,
  1420. format_log_message("Socket [%s] zk retcode=%d, errno=%d(%s): %s",
  1421. zoo_get_current_server(zh),rc,errno,strerror(errno),buf));
  1422. va_end(va);
  1423. }
  1424. handle_error(zh,rc);
  1425. return rc;
  1426. }
  1427. static void auth_completion_func(int rc, zhandle_t* zh)
  1428. {
  1429. void_completion_t auth_completion = NULL;
  1430. auth_completion_list_t a_list;
  1431. auth_completion_list_t *a_tmp;
  1432. if(zh==NULL)
  1433. return;
  1434. zoo_lock_auth(zh);
  1435. if(rc!=0){
  1436. zh->state=ZOO_AUTH_FAILED_STATE;
  1437. }else{
  1438. //change state for all auths
  1439. mark_active_auth(zh);
  1440. }
  1441. a_list.completion = NULL;
  1442. a_list.next = NULL;
  1443. get_auth_completions(&zh->auth_h, &a_list);
  1444. zoo_unlock_auth(zh);
  1445. if (rc) {
  1446. LOG_ERROR(("Authentication scheme %s failed. Connection closed.",
  1447. zh->auth_h.auth->scheme));
  1448. }
  1449. else {
  1450. LOG_INFO(("Authentication scheme %s succeeded", zh->auth_h.auth->scheme));
  1451. }
  1452. a_tmp = &a_list;
  1453. // chain call user's completion function
  1454. while (a_tmp->completion != NULL) {
  1455. auth_completion = a_tmp->completion;
  1456. auth_completion(rc, a_tmp->auth_data);
  1457. a_tmp = a_tmp->next;
  1458. if (a_tmp == NULL)
  1459. break;
  1460. }
  1461. free_auth_completion(&a_list);
  1462. }
  1463. static int send_info_packet(zhandle_t *zh, auth_info* auth) {
  1464. struct oarchive *oa;
  1465. struct RequestHeader h = {AUTH_XID, ZOO_SETAUTH_OP};
  1466. struct AuthPacket req;
  1467. int rc;
  1468. oa = create_buffer_oarchive();
  1469. rc = serialize_RequestHeader(oa, "header", &h);
  1470. req.type=0; // ignored by the server
  1471. req.scheme = auth->scheme;
  1472. req.auth = auth->auth;
  1473. rc = rc < 0 ? rc : serialize_AuthPacket(oa, "req", &req);
  1474. /* add this buffer to the head of the send queue */
  1475. rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
  1476. get_buffer_len(oa));
  1477. /* We queued the buffer, so don't free it */
  1478. close_buffer_oarchive(&oa, 0);
  1479. return rc;
  1480. }
  1481. /** send all auths, not just the last one **/
  1482. static int send_auth_info(zhandle_t *zh) {
  1483. int rc = 0;
  1484. auth_info *auth = NULL;
  1485. zoo_lock_auth(zh);
  1486. auth = zh->auth_h.auth;
  1487. if (auth == NULL) {
  1488. zoo_unlock_auth(zh);
  1489. return ZOK;
  1490. }
  1491. while (auth != NULL) {
  1492. rc = send_info_packet(zh, auth);
  1493. auth = auth->next;
  1494. }
  1495. zoo_unlock_auth(zh);
  1496. LOG_DEBUG(("Sending all auth info request to %s", zoo_get_current_server(zh)));
  1497. return (rc <0) ? ZMARSHALLINGERROR:ZOK;
  1498. }
  1499. static int send_last_auth_info(zhandle_t *zh)
  1500. {
  1501. int rc = 0;
  1502. auth_info *auth = NULL;
  1503. zoo_lock_auth(zh);
  1504. auth = get_last_auth(&zh->auth_h);
  1505. if(auth==NULL) {
  1506. zoo_unlock_auth(zh);
  1507. return ZOK; // there is nothing to send
  1508. }
  1509. rc = send_info_packet(zh, auth);
  1510. zoo_unlock_auth(zh);
  1511. LOG_DEBUG(("Sending auth info request to %s",zoo_get_current_server(zh)));
  1512. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1513. }
  1514. static void free_key_list(char **list, int count)
  1515. {
  1516. int i;
  1517. for(i = 0; i < count; i++) {
  1518. free(list[i]);
  1519. }
  1520. free(list);
  1521. }
  1522. static int send_set_watches(zhandle_t *zh)
  1523. {
  1524. struct oarchive *oa;
  1525. struct RequestHeader h = {SET_WATCHES_XID, ZOO_SETWATCHES_OP};
  1526. struct SetWatches req;
  1527. int rc;
  1528. req.relativeZxid = zh->last_zxid;
  1529. req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
  1530. req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
  1531. req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);
  1532. // return if there are no pending watches
  1533. if (!req.dataWatches.count && !req.existWatches.count &&
  1534. !req.childWatches.count) {
  1535. free_key_list(req.dataWatches.data, req.dataWatches.count);
  1536. free_key_list(req.existWatches.data, req.existWatches.count);
  1537. free_key_list(req.childWatches.data, req.childWatches.count);
  1538. return ZOK;
  1539. }
  1540. oa = create_buffer_oarchive();
  1541. rc = serialize_RequestHeader(oa, "header", &h);
  1542. rc = rc < 0 ? rc : serialize_SetWatches(oa, "req", &req);
  1543. /* add this buffer to the head of the send queue */
  1544. rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
  1545. get_buffer_len(oa));
  1546. /* We queued the buffer, so don't free it */
  1547. close_buffer_oarchive(&oa, 0);
  1548. free_key_list(req.dataWatches.data, req.dataWatches.count);
  1549. free_key_list(req.existWatches.data, req.existWatches.count);
  1550. free_key_list(req.childWatches.data, req.childWatches.count);
  1551. LOG_DEBUG(("Sending set watches request to %s",zoo_get_current_server(zh)));
  1552. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  1553. }
  1554. static int serialize_prime_connect(struct connect_req *req, char* buffer){
  1555. //this should be the order of serialization
  1556. int offset = 0;
  1557. req->protocolVersion = htonl(req->protocolVersion);
  1558. memcpy(buffer + offset, &req->protocolVersion, sizeof(req->protocolVersion));
  1559. offset = offset + sizeof(req->protocolVersion);
  1560. req->lastZxidSeen = htonll(req->lastZxidSeen);
  1561. memcpy(buffer + offset, &req->lastZxidSeen, sizeof(req->lastZxidSeen));
  1562. offset = offset + sizeof(req->lastZxidSeen);
  1563. req->timeOut = htonl(req->timeOut);
  1564. memcpy(buffer + offset, &req->timeOut, sizeof(req->timeOut));
  1565. offset = offset + sizeof(req->timeOut);
  1566. req->sessionId = htonll(req->sessionId);
  1567. memcpy(buffer + offset, &req->sessionId, sizeof(req->sessionId));
  1568. offset = offset + sizeof(req->sessionId);
  1569. req->passwd_len = htonl(req->passwd_len);
  1570. memcpy(buffer + offset, &req->passwd_len, sizeof(req->passwd_len));
  1571. offset = offset + sizeof(req->passwd_len);
  1572. memcpy(buffer + offset, req->passwd, sizeof(req->passwd));
  1573. return 0;
  1574. }
  1575. static int deserialize_prime_response(struct prime_struct *req, char* buffer){
  1576. //this should be the order of deserialization
  1577. int offset = 0;
  1578. memcpy(&req->len, buffer + offset, sizeof(req->len));
  1579. offset = offset + sizeof(req->len);
  1580. req->len = ntohl(req->len);
  1581. memcpy(&req->protocolVersion, buffer + offset, sizeof(req->protocolVersion));
  1582. offset = offset + sizeof(req->protocolVersion);
  1583. req->protocolVersion = ntohl(req->protocolVersion);
  1584. memcpy(&req->timeOut, buffer + offset, sizeof(req->timeOut));
  1585. offset = offset + sizeof(req->timeOut);
  1586. req->timeOut = ntohl(req->timeOut);
  1587. memcpy(&req->sessionId, buffer + offset, sizeof(req->sessionId));
  1588. offset = offset + sizeof(req->sessionId);
  1589. req->sessionId = htonll(req->sessionId);
  1590. memcpy(&req->passwd_len, buffer + offset, sizeof(req->passwd_len));
  1591. offset = offset + sizeof(req->passwd_len);
  1592. req->passwd_len = ntohl(req->passwd_len);
  1593. memcpy(req->passwd, buffer + offset, sizeof(req->passwd));
  1594. return 0;
  1595. }
  1596. static int prime_connection(zhandle_t *zh)
  1597. {
  1598. int rc;
  1599. /*this is the size of buffer to serialize req into*/
  1600. char buffer_req[HANDSHAKE_REQ_SIZE];
  1601. int len = sizeof(buffer_req);
  1602. int hlen = 0;
  1603. struct connect_req req;
  1604. req.protocolVersion = 0;
  1605. req.sessionId = zh->client_id.client_id;
  1606. req.passwd_len = sizeof(req.passwd);
  1607. memcpy(req.passwd, zh->client_id.passwd, sizeof(zh->client_id.passwd));
  1608. req.timeOut = zh->recv_timeout;
  1609. req.lastZxidSeen = zh->last_zxid;
  1610. hlen = htonl(len);
  1611. /* We are running fast and loose here, but this string should fit in the initial buffer! */
  1612. rc=zookeeper_send(zh->fd, &hlen, sizeof(len));
  1613. serialize_prime_connect(&req, buffer_req);
  1614. rc=rc<0 ? rc : zookeeper_send(zh->fd, buffer_req, len);
  1615. if (rc<0) {
  1616. return handle_socket_error_msg(zh, __LINE__, ZCONNECTIONLOSS,
  1617. "failed to send a handshake packet: %s", strerror(errno));
  1618. }
  1619. zh->state = ZOO_ASSOCIATING_STATE;
  1620. zh->input_buffer = &zh->primer_buffer;
  1621. /* This seems a bit weird to to set the offset to 4, but we already have a
  1622. * length, so we skip reading the length (and allocating the buffer) by
  1623. * saying that we are already at offset 4 */
  1624. zh->input_buffer->curr_offset = 4;
  1625. return ZOK;
  1626. }
  1627. static inline int calculate_interval(const struct timeval *start,
  1628. const struct timeval *end)
  1629. {
  1630. int interval;
  1631. struct timeval i = *end;
  1632. i.tv_sec -= start->tv_sec;
  1633. i.tv_usec -= start->tv_usec;
  1634. interval = i.tv_sec * 1000 + (i.tv_usec/1000);
  1635. return interval;
  1636. }
  1637. static struct timeval get_timeval(int interval)
  1638. {
  1639. struct timeval tv;
  1640. if (interval < 0) {
  1641. interval = 0;
  1642. }
  1643. tv.tv_sec = interval/1000;
  1644. tv.tv_usec = (interval%1000)*1000;
  1645. return tv;
  1646. }
  1647. static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
  1648. const void *data);
  1649. static int add_string_completion(zhandle_t *zh, int xid,
  1650. string_completion_t dc, const void *data);
  1651. static int add_string_stat_completion(zhandle_t *zh, int xid,
  1652. string_stat_completion_t dc, const void *data);
  1653. int send_ping(zhandle_t* zh)
  1654. {
  1655. int rc;
  1656. struct oarchive *oa = create_buffer_oarchive();
  1657. struct RequestHeader h = {PING_XID, ZOO_PING_OP};
  1658. rc = serialize_RequestHeader(oa, "header", &h);
  1659. enter_critical(zh);
  1660. gettimeofday(&zh->last_ping, 0);
  1661. rc = rc < 0 ? rc : add_void_completion(zh, h.xid, 0, 0);
  1662. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  1663. get_buffer_len(oa));
  1664. leave_critical(zh);
  1665. close_buffer_oarchive(&oa, 0);
  1666. return rc<0 ? rc : adaptor_send_queue(zh, 0);
  1667. }
  1668. #ifdef WIN32
  1669. int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
  1670. struct timeval *tv)
  1671. {
  1672. ULONG nonblocking_flag = 1;
  1673. #else
  1674. int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
  1675. struct timeval *tv)
  1676. {
  1677. #endif
  1678. struct timeval now;
  1679. if(zh==0 || fd==0 ||interest==0 || tv==0)
  1680. return ZBADARGUMENTS;
  1681. if (is_unrecoverable(zh))
  1682. return ZINVALIDSTATE;
  1683. gettimeofday(&now, 0);
  1684. if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
  1685. int time_left = calculate_interval(&zh->next_deadline, &now);
  1686. int max_exceed = zh->recv_timeout / 10 > 200 ? 200 :
  1687. (zh->recv_timeout / 10);
  1688. if (time_left > max_exceed)
  1689. LOG_WARN(("Exceeded deadline by %dms", time_left));
  1690. }
  1691. api_prolog(zh);
  1692. int rc = update_addrs(zh);
  1693. if (rc != ZOK) {
  1694. return api_epilog(zh, rc);
  1695. }
  1696. *fd = zh->fd;
  1697. *interest = 0;
  1698. tv->tv_sec = 0;
  1699. tv->tv_usec = 0;
  1700. if (*fd == -1) {
  1701. /*
  1702. * If we previously failed to connect to server pool (zh->delay == 1)
  1703. * then we need delay our connection on this iteration 1/60 of the
  1704. * recv timeout before trying again so we don't spin.
  1705. *
  1706. * We always clear the delay setting. If we fail again, we'll set delay
  1707. * again and on the next iteration we'll do the same.
  1708. */
  1709. if (zh->delay == 1) {
  1710. *tv = get_timeval(zh->recv_timeout/60);
  1711. zh->delay = 0;
  1712. LOG_WARN(("Delaying connection after exhaustively trying all servers [%s]",
  1713. zh->hostname));
  1714. }
  1715. // No need to delay -- grab the next server and attempt connection
  1716. else {
  1717. zoo_cycle_next_server(zh);
  1718. #ifdef WIN32
  1719. char enable_tcp_nodelay = 1;
  1720. #else
  1721. int enable_tcp_nodelay = 1;
  1722. #endif
  1723. int ssoresult;
  1724. zh->fd = socket(zh->addr_cur.ss_family, SOCK_STREAM, 0);
  1725. if (zh->fd < 0) {
  1726. return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
  1727. ZSYSTEMERROR, "socket() call failed"));
  1728. }
  1729. ssoresult = setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &enable_tcp_nodelay, sizeof(enable_tcp_nodelay));
  1730. if (ssoresult != 0) {
  1731. LOG_WARN(("Unable to set TCP_NODELAY, operation latency may be effected"));
  1732. }
  1733. #ifdef WIN32
  1734. ioctlsocket(zh->fd, FIONBIO, &nonblocking_flag);
  1735. #else
  1736. fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
  1737. #endif
  1738. #if defined(AF_INET6)
  1739. if (zh->addr_cur.ss_family == AF_INET6) {
  1740. rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in6));
  1741. } else {
  1742. #else
  1743. LOG_DEBUG(("[zk] connect()\n"));
  1744. {
  1745. #endif
  1746. rc = connect(zh->fd, (struct sockaddr*)&zh->addr_cur, sizeof(struct sockaddr_in));
  1747. #ifdef WIN32
  1748. get_errno();
  1749. #endif
  1750. }
  1751. if (rc == -1) {
  1752. /* we are handling the non-blocking connect according to
  1753. * the description in section 16.3 "Non-blocking connect"
  1754. * in UNIX Network Programming vol 1, 3rd edition */
  1755. if (errno == EWOULDBLOCK || errno == EINPROGRESS)
  1756. zh->state = ZOO_CONNECTING_STATE;
  1757. else
  1758. {
  1759. return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
  1760. ZCONNECTIONLOSS,"connect() call failed"));
  1761. }
  1762. } else {
  1763. if((rc=prime_connection(zh))!=0)
  1764. return api_epilog(zh,rc);
  1765. LOG_INFO(("Initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
  1766. }
  1767. *tv = get_timeval(zh->recv_timeout/3);
  1768. }
  1769. *fd = zh->fd;
  1770. zh->last_recv = now;
  1771. zh->last_send = now;
  1772. zh->last_ping = now;
  1773. }
  1774. if (zh->fd != -1) {
  1775. int idle_recv = calculate_interval(&zh->last_recv, &now);
  1776. int idle_send = calculate_interval(&zh->last_send, &now);
  1777. int recv_to = zh->recv_timeout*2/3 - idle_recv;
  1778. int send_to = zh->recv_timeout/3;
  1779. // have we exceeded the receive timeout threshold?
  1780. if (recv_to <= 0) {
  1781. // We gotta cut our losses and connect to someone else
  1782. #ifdef WIN32
  1783. errno = WSAETIMEDOUT;
  1784. #else
  1785. errno = ETIMEDOUT;
  1786. #endif
  1787. *interest=0;
  1788. *tv = get_timeval(0);
  1789. return api_epilog(zh,handle_socket_error_msg(zh,
  1790. __LINE__,ZOPERATIONTIMEOUT,
  1791. "connection to %s timed out (exceeded timeout by %dms)",
  1792. format_endpoint_info(&zh->addr_cur),
  1793. -recv_to));
  1794. }
  1795. // We only allow 1/3 of our timeout time to expire before sending
  1796. // a PING
  1797. if (zh->state==ZOO_CONNECTED_STATE) {
  1798. send_to = zh->recv_timeout/3 - idle_send;
  1799. if (send_to <= 0 && zh->sent_requests.head==0) {
  1800. // LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
  1801. // zoo_get_current_server(zh),-send_to));
  1802. rc = send_ping(zh);
  1803. if (rc < 0){
  1804. LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
  1805. return api_epilog(zh,rc);
  1806. }
  1807. send_to = zh->recv_timeout/3;
  1808. }
  1809. }
  1810. // choose the lesser value as the timeout
  1811. *tv = get_timeval(recv_to < send_to? recv_to:send_to);
  1812. zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
  1813. zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
  1814. if (zh->next_deadline.tv_usec > 1000000) {
  1815. zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
  1816. zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
  1817. }
  1818. *interest = ZOOKEEPER_READ;
  1819. /* we are interested in a write if we are connected and have something
  1820. * to send, or we are waiting for a connect to finish. */
  1821. if ((zh->to_send.head && (zh->state == ZOO_CONNECTED_STATE))
  1822. || zh->state == ZOO_CONNECTING_STATE) {
  1823. *interest |= ZOOKEEPER_WRITE;
  1824. }
  1825. }
  1826. return api_epilog(zh,ZOK);
  1827. }
  1828. static int check_events(zhandle_t *zh, int events)
  1829. {
  1830. if (zh->fd == -1)
  1831. return ZINVALIDSTATE;
  1832. if ((events&ZOOKEEPER_WRITE)&&(zh->state == ZOO_CONNECTING_STATE)) {
  1833. int rc, error;
  1834. socklen_t len = sizeof(error);
  1835. rc = getsockopt(zh->fd, SOL_SOCKET, SO_ERROR, &error, &len);
  1836. /* the description in section 16.4 "Non-blocking connect"
  1837. * in UNIX Network Programming vol 1, 3rd edition, points out
  1838. * that sometimes the error is in errno and sometimes in error */
  1839. if (rc < 0 || error) {
  1840. if (rc == 0)
  1841. errno = error;
  1842. return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
  1843. "server refused to accept the client");
  1844. }
  1845. if((rc=prime_connection(zh))!=0)
  1846. return rc;
  1847. LOG_INFO(("initiated connection to server [%s]", format_endpoint_info(&zh->addr_cur)));
  1848. return ZOK;
  1849. }
  1850. if (zh->to_send.head && (events&ZOOKEEPER_WRITE)) {
  1851. /* make the flush call non-blocking by specifying a 0 timeout */
  1852. int rc=flush_send_queue(zh,0);
  1853. if (rc < 0)
  1854. return handle_socket_error_msg(zh,__LINE__,ZCONNECTIONLOSS,
  1855. "failed while flushing send queue");
  1856. }
  1857. if (events&ZOOKEEPER_READ) {
  1858. int rc;
  1859. if (zh->input_buffer == 0) {
  1860. zh->input_buffer = allocate_buffer(0,0);
  1861. }
  1862. rc = recv_buffer(zh->fd, zh->input_buffer);
  1863. if (rc < 0) {
  1864. return handle_socket_error_msg(zh, __LINE__,ZCONNECTIONLOSS,
  1865. "failed while receiving a server response");
  1866. }
  1867. if (rc > 0) {
  1868. gettimeofday(&zh->last_recv, 0);
  1869. if (zh->input_buffer != &zh->primer_buffer) {
  1870. queue_buffer(&zh->to_process, zh->input_buffer, 0);
  1871. } else {
  1872. int64_t oldid,newid;
  1873. //deserialize
  1874. deserialize_prime_response(&zh->primer_storage, zh->primer_buffer.buffer);
  1875. /* We are processing the primer_buffer, so we need to finish
  1876. * the connection handshake */
  1877. oldid = zh->client_id.client_id;
  1878. newid = zh->primer_storage.sessionId;
  1879. if (oldid != 0 && oldid != newid) {
  1880. zh->state = ZOO_EXPIRED_SESSION_STATE;
  1881. errno = ESTALE;
  1882. return handle_socket_error_msg(zh,__LINE__,ZSESSIONEXPIRED,
  1883. "sessionId=%#llx has expired.",oldid);
  1884. } else {
  1885. zh->recv_timeout = zh->primer_storage.timeOut;
  1886. zh->client_id.client_id = newid;
  1887. memcpy(zh->client_id.passwd, &zh->primer_storage.passwd,
  1888. sizeof(zh->client_id.passwd));
  1889. zh->state = ZOO_CONNECTED_STATE;
  1890. zh->reconfig = 0;
  1891. LOG_INFO(("session establishment complete on server [%s], sessionId=%#llx, negotiated timeout=%d",
  1892. format_endpoint_info(&zh->addr_cur),
  1893. newid, zh->recv_timeout));
  1894. /* we want the auth to be sent for, but since both call push to front
  1895. we need to call send_watch_set first */
  1896. send_set_watches(zh);
  1897. /* send the authentication packet now */
  1898. send_auth_info(zh);
  1899. LOG_DEBUG(("Calling a watcher for a ZOO_SESSION_EVENT and the state=ZOO_CONNECTED_STATE"));
  1900. zh->input_buffer = 0; // just in case the watcher calls zookeeper_process() again
  1901. PROCESS_SESSION_EVENT(zh, ZOO_CONNECTED_STATE);
  1902. }
  1903. }
  1904. zh->input_buffer = 0;
  1905. } else {
  1906. // zookeeper_process was called but there was nothing to read
  1907. // from the socket
  1908. return ZNOTHING;
  1909. }
  1910. }
  1911. return ZOK;
  1912. }
  1913. void api_prolog(zhandle_t* zh)
  1914. {
  1915. inc_ref_counter(zh,1);
  1916. }
  1917. int api_epilog(zhandle_t *zh,int rc)
  1918. {
  1919. if(inc_ref_counter(zh,-1)==0 && zh->close_requested!=0)
  1920. zookeeper_close(zh);
  1921. return rc;
  1922. }
  1923. static __attribute__((unused)) void print_completion_queue(zhandle_t *zh)
  1924. {
  1925. completion_list_t* cptr;
  1926. if(logLevel<ZOO_LOG_LEVEL_DEBUG) return;
  1927. fprintf(LOGSTREAM,"Completion queue: ");
  1928. if (zh->sent_requests.head==0) {
  1929. fprintf(LOGSTREAM,"empty\n");
  1930. return;
  1931. }
  1932. cptr=zh->sent_requests.head;
  1933. while(cptr){
  1934. fprintf(LOGSTREAM,"%d,",cptr->xid);
  1935. cptr=cptr->next;
  1936. }
  1937. fprintf(LOGSTREAM,"end\n");
  1938. }
  1939. //#ifdef THREADED
  1940. // IO thread queues session events to be processed by the completion thread
  1941. static int queue_session_event(zhandle_t *zh, int state)
  1942. {
  1943. int rc;
  1944. struct WatcherEvent evt = { ZOO_SESSION_EVENT, state, "" };
  1945. struct ReplyHeader hdr = { WATCHER_EVENT_XID, 0, 0 };
  1946. struct oarchive *oa;
  1947. completion_list_t *cptr;
  1948. if ((oa=create_buffer_oarchive())==NULL) {
  1949. LOG_ERROR(("out of memory"));
  1950. goto error;
  1951. }
  1952. rc = serialize_ReplyHeader(oa, "hdr", &hdr);
  1953. rc = rc<0?rc: serialize_WatcherEvent(oa, "event", &evt);
  1954. if(rc<0){
  1955. close_buffer_oarchive(&oa, 1);
  1956. goto error;
  1957. }
  1958. cptr = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
  1959. cptr->buffer = allocate_buffer(get_buffer(oa), get_buffer_len(oa));
  1960. cptr->buffer->curr_offset = get_buffer_len(oa);
  1961. if (!cptr->buffer) {
  1962. free(cptr);
  1963. close_buffer_oarchive(&oa, 1);
  1964. goto error;
  1965. }
  1966. /* We queued the buffer, so don't free it */
  1967. close_buffer_oarchive(&oa, 0);
  1968. cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
  1969. queue_completion(&zh->completions_to_process, cptr, 0);
  1970. if (process_async(zh->outstanding_sync)) {
  1971. process_completions(zh);
  1972. }
  1973. return ZOK;
  1974. error:
  1975. errno=ENOMEM;
  1976. return ZSYSTEMERROR;
  1977. }
  1978. //#endif
  1979. completion_list_t *dequeue_completion(completion_head_t *list)
  1980. {
  1981. completion_list_t *cptr;
  1982. lock_completion_list(list);
  1983. cptr = list->head;
  1984. if (cptr) {
  1985. list->head = cptr->next;
  1986. if (!list->head) {
  1987. assert(list->last == cptr);
  1988. list->last = 0;
  1989. }
  1990. }
  1991. unlock_completion_list(list);
  1992. return cptr;
  1993. }
  1994. static void process_sync_completion(
  1995. completion_list_t *cptr,
  1996. struct sync_completion *sc,
  1997. struct iarchive *ia,
  1998. zhandle_t *zh)
  1999. {
  2000. LOG_DEBUG(("Processing sync_completion with type=%d xid=%#x rc=%d",
  2001. cptr->c.type, cptr->xid, sc->rc));
  2002. switch(cptr->c.type) {
  2003. case COMPLETION_DATA:
  2004. if (sc->rc==0) {
  2005. struct GetDataResponse res;
  2006. int len;
  2007. deserialize_GetDataResponse(ia, "reply", &res);
  2008. if (res.data.len <= sc->u.data.buff_len) {
  2009. len = res.data.len;
  2010. } else {
  2011. len = sc->u.data.buff_len;
  2012. }
  2013. sc->u.data.buff_len = len;
  2014. // check if len is negative
  2015. // just of NULL which is -1 int
  2016. if (len == -1) {
  2017. sc->u.data.buffer = NULL;
  2018. } else {
  2019. memcpy(sc->u.data.buffer, res.data.buff, len);
  2020. }
  2021. sc->u.data.stat = res.stat;
  2022. deallocate_GetDataResponse(&res);
  2023. }
  2024. break;
  2025. case COMPLETION_STAT:
  2026. if (sc->rc==0) {
  2027. struct SetDataResponse res;
  2028. deserialize_SetDataResponse(ia, "reply", &res);
  2029. sc->u.stat = res.stat;
  2030. deallocate_SetDataResponse(&res);
  2031. }
  2032. break;
  2033. case COMPLETION_STRINGLIST:
  2034. if (sc->rc==0) {
  2035. struct GetChildrenResponse res;
  2036. deserialize_GetChildrenResponse(ia, "reply", &res);
  2037. sc->u.strs2 = res.children;
  2038. /* We don't deallocate since we are passing it back */
  2039. // deallocate_GetChildrenResponse(&res);
  2040. }
  2041. break;
  2042. case COMPLETION_STRINGLIST_STAT:
  2043. if (sc->rc==0) {
  2044. struct GetChildren2Response res;
  2045. deserialize_GetChildren2Response(ia, "reply", &res);
  2046. sc->u.strs_stat.strs2 = res.children;
  2047. sc->u.strs_stat.stat2 = res.stat;
  2048. /* We don't deallocate since we are passing it back */
  2049. // deallocate_GetChildren2Response(&res);
  2050. }
  2051. break;
  2052. case COMPLETION_STRING:
  2053. if (sc->rc==0) {
  2054. struct CreateResponse res;
  2055. int len;
  2056. const char * client_path;
  2057. deserialize_CreateResponse(ia, "reply", &res);
  2058. //ZOOKEEPER-1027
  2059. client_path = sub_string(zh, res.path);
  2060. len = strlen(client_path) + 1;if (len > sc->u.str.str_len) {
  2061. len = sc->u.str.str_len;
  2062. }
  2063. if (len > 0) {
  2064. memcpy(sc->u.str.str, client_path, len - 1);
  2065. sc->u.str.str[len - 1] = '\0';
  2066. }
  2067. free_duplicate_path(client_path, res.path);
  2068. deallocate_CreateResponse(&res);
  2069. }
  2070. break;
  2071. case COMPLETION_STRING_STAT:
  2072. if (sc->rc==0) {
  2073. struct Create2Response res;
  2074. int len;
  2075. const char * client_path;
  2076. deserialize_Create2Response(ia, "reply", &res);
  2077. client_path = sub_string(zh, res.path);
  2078. len = strlen(client_path) + 1;
  2079. if (len > sc->u.str.str_len) {
  2080. len = sc->u.str.str_len;
  2081. }
  2082. if (len > 0) {
  2083. memcpy(sc->u.str.str, client_path, len - 1);
  2084. sc->u.str.str[len - 1] = '\0';
  2085. }
  2086. free_duplicate_path(client_path, res.path);
  2087. sc->u.stat = res.stat;
  2088. deallocate_Create2Response(&res);
  2089. }
  2090. break;
  2091. case COMPLETION_ACLLIST:
  2092. if (sc->rc==0) {
  2093. struct GetACLResponse res;
  2094. deserialize_GetACLResponse(ia, "reply", &res);
  2095. sc->u.acl.acl = res.acl;
  2096. sc->u.acl.stat = res.stat;
  2097. /* We don't deallocate since we are passing it back */
  2098. //deallocate_GetACLResponse(&res);
  2099. }
  2100. break;
  2101. case COMPLETION_VOID:
  2102. break;
  2103. case COMPLETION_MULTI:
  2104. sc->rc = deserialize_multi(cptr->xid, cptr, ia);
  2105. break;
  2106. default:
  2107. LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
  2108. break;
  2109. }
  2110. }
  2111. static int deserialize_multi(int xid, completion_list_t *cptr, struct iarchive *ia)
  2112. {
  2113. int rc = 0;
  2114. completion_head_t *clist = &cptr->c.clist;
  2115. struct MultiHeader mhdr = {0, 0, 0};
  2116. assert(clist);
  2117. deserialize_MultiHeader(ia, "multiheader", &mhdr);
  2118. while (!mhdr.done) {
  2119. completion_list_t *entry = dequeue_completion(clist);
  2120. assert(entry);
  2121. if (mhdr.type == -1) {
  2122. struct ErrorResponse er;
  2123. deserialize_ErrorResponse(ia, "error", &er);
  2124. mhdr.err = er.err ;
  2125. if (rc == 0 && er.err != 0 && er.err != ZRUNTIMEINCONSISTENCY) {
  2126. rc = er.err;
  2127. }
  2128. }
  2129. deserialize_response(entry->c.type, xid, mhdr.type == -1, mhdr.err, entry, ia);
  2130. deserialize_MultiHeader(ia, "multiheader", &mhdr);
  2131. }
  2132. return rc;
  2133. }
  2134. static void deserialize_response(int type, int xid, int failed, int rc, completion_list_t *cptr, struct iarchive *ia)
  2135. {
  2136. switch (type) {
  2137. case COMPLETION_DATA:
  2138. LOG_DEBUG(("Calling COMPLETION_DATA for xid=%#x failed=%d rc=%d",
  2139. cptr->xid, failed, rc));
  2140. if (failed) {
  2141. cptr->c.data_result(rc, 0, 0, 0, cptr->data);
  2142. } else {
  2143. struct GetDataResponse res;
  2144. deserialize_GetDataResponse(ia, "reply", &res);
  2145. cptr->c.data_result(rc, res.data.buff, res.data.len,
  2146. &res.stat, cptr->data);
  2147. deallocate_GetDataResponse(&res);
  2148. }
  2149. break;
  2150. case COMPLETION_STAT:
  2151. LOG_DEBUG(("Calling COMPLETION_STAT for xid=%#x failed=%d rc=%d",
  2152. cptr->xid, failed, rc));
  2153. if (failed) {
  2154. cptr->c.stat_result(rc, 0, cptr->data);
  2155. } else {
  2156. struct SetDataResponse res;
  2157. deserialize_SetDataResponse(ia, "reply", &res);
  2158. cptr->c.stat_result(rc, &res.stat, cptr->data);
  2159. deallocate_SetDataResponse(&res);
  2160. }
  2161. break;
  2162. case COMPLETION_STRINGLIST:
  2163. LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%#x failed=%d rc=%d",
  2164. cptr->xid, failed, rc));
  2165. if (failed) {
  2166. cptr->c.strings_result(rc, 0, cptr->data);
  2167. } else {
  2168. struct GetChildrenResponse res;
  2169. deserialize_GetChildrenResponse(ia, "reply", &res);
  2170. cptr->c.strings_result(rc, &res.children, cptr->data);
  2171. deallocate_GetChildrenResponse(&res);
  2172. }
  2173. break;
  2174. case COMPLETION_STRINGLIST_STAT:
  2175. LOG_DEBUG(("Calling COMPLETION_STRINGLIST_STAT for xid=%#x failed=%d rc=%d",
  2176. cptr->xid, failed, rc));
  2177. if (failed) {
  2178. cptr->c.strings_stat_result(rc, 0, 0, cptr->data);
  2179. } else {
  2180. struct GetChildren2Response res;
  2181. deserialize_GetChildren2Response(ia, "reply", &res);
  2182. cptr->c.strings_stat_result(rc, &res.children, &res.stat, cptr->data);
  2183. deallocate_GetChildren2Response(&res);
  2184. }
  2185. break;
  2186. case COMPLETION_STRING:
  2187. LOG_DEBUG(("Calling COMPLETION_STRING for xid=%#x failed=%d, rc=%d",
  2188. cptr->xid, failed, rc));
  2189. if (failed) {
  2190. cptr->c.string_result(rc, 0, cptr->data);
  2191. } else {
  2192. struct CreateResponse res;
  2193. deserialize_CreateResponse(ia, "reply", &res);
  2194. cptr->c.string_result(rc, res.path, cptr->data);
  2195. deallocate_CreateResponse(&res);
  2196. }
  2197. break;
  2198. case COMPLETION_STRING_STAT:
  2199. LOG_DEBUG(("Calling COMPLETION_STRING_STAT for xid=%#x failed=%d, rc=%d",
  2200. cptr->xid, failed, rc));
  2201. if (failed) {
  2202. cptr->c.string_stat_result(rc, 0, 0, cptr->data);
  2203. } else {
  2204. struct Create2Response res;
  2205. deserialize_Create2Response(ia, "reply", &res);
  2206. cptr->c.string_stat_result(rc, res.path, &res.stat, cptr->data);
  2207. deallocate_Create2Response(&res);
  2208. }
  2209. break;
  2210. case COMPLETION_ACLLIST:
  2211. LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%#x failed=%d rc=%d",
  2212. cptr->xid, failed, rc));
  2213. if (failed) {
  2214. cptr->c.acl_result(rc, 0, 0, cptr->data);
  2215. } else {
  2216. struct GetACLResponse res;
  2217. deserialize_GetACLResponse(ia, "reply", &res);
  2218. cptr->c.acl_result(rc, &res.acl, &res.stat, cptr->data);
  2219. deallocate_GetACLResponse(&res);
  2220. }
  2221. break;
  2222. case COMPLETION_VOID:
  2223. LOG_DEBUG(("Calling COMPLETION_VOID for xid=%#x failed=%d rc=%d",
  2224. cptr->xid, failed, rc));
  2225. if (xid == PING_XID) {
  2226. // We want to skip the ping
  2227. } else {
  2228. assert(cptr->c.void_result);
  2229. cptr->c.void_result(rc, cptr->data);
  2230. }
  2231. break;
  2232. case COMPLETION_MULTI:
  2233. LOG_DEBUG(("Calling COMPLETION_MULTI for xid=%#x failed=%d rc=%d",
  2234. cptr->xid, failed, rc));
  2235. rc = deserialize_multi(xid, cptr, ia);
  2236. assert(cptr->c.void_result);
  2237. cptr->c.void_result(rc, cptr->data);
  2238. break;
  2239. default:
  2240. LOG_DEBUG(("Unsupported completion type=%d", cptr->c.type));
  2241. }
  2242. }
  2243. /* handles async completion (both single- and multithreaded) */
  2244. void process_completions(zhandle_t *zh)
  2245. {
  2246. completion_list_t *cptr;
  2247. while ((cptr = dequeue_completion(&zh->completions_to_process)) != 0) {
  2248. struct ReplyHeader hdr;
  2249. buffer_list_t *bptr = cptr->buffer;
  2250. struct iarchive *ia = create_buffer_iarchive(bptr->buffer,
  2251. bptr->len);
  2252. deserialize_ReplyHeader(ia, "hdr", &hdr);
  2253. if (hdr.xid == WATCHER_EVENT_XID) {
  2254. int type, state;
  2255. struct WatcherEvent evt;
  2256. deserialize_WatcherEvent(ia, "event", &evt);
  2257. /* We are doing a notification, so there is no pending request */
  2258. type = evt.type;
  2259. state = evt.state;
  2260. /* This is a notification so there aren't any pending requests */
  2261. LOG_DEBUG(("Calling a watcher for node [%s], type = %d event=%s",
  2262. (evt.path==NULL?"NULL":evt.path), cptr->c.type,
  2263. watcherEvent2String(type)));
  2264. deliverWatchers(zh,type,state,evt.path, &cptr->c.watcher_result);
  2265. deallocate_WatcherEvent(&evt);
  2266. } else {
  2267. deserialize_response(cptr->c.type, hdr.xid, hdr.err != 0, hdr.err, cptr, ia);
  2268. }
  2269. destroy_completion_entry(cptr);
  2270. close_buffer_iarchive(&ia);
  2271. }
  2272. }
  2273. static void isSocketReadable(zhandle_t* zh)
  2274. {
  2275. #ifndef WIN32
  2276. struct pollfd fds;
  2277. fds.fd = zh->fd;
  2278. fds.events = POLLIN;
  2279. if (poll(&fds,1,0)<=0) {
  2280. // socket not readable -- no more responses to process
  2281. zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
  2282. }
  2283. #else
  2284. fd_set rfds;
  2285. struct timeval waittime = {0, 0};
  2286. FD_ZERO(&rfds);
  2287. FD_SET( zh->fd , &rfds);
  2288. if (select(0, &rfds, NULL, NULL, &waittime) <= 0){
  2289. // socket not readable -- no more responses to process
  2290. zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
  2291. }
  2292. #endif
  2293. else{
  2294. gettimeofday(&zh->socket_readable,0);
  2295. }
  2296. }
  2297. static void checkResponseLatency(zhandle_t* zh)
  2298. {
  2299. int delay;
  2300. struct timeval now;
  2301. if(zh->socket_readable.tv_sec==0)
  2302. return;
  2303. gettimeofday(&now,0);
  2304. delay=calculate_interval(&zh->socket_readable, &now);
  2305. if(delay>20)
  2306. LOG_DEBUG(("The following server response has spent at least %dms sitting in the client socket recv buffer",delay));
  2307. zh->socket_readable.tv_sec=zh->socket_readable.tv_usec=0;
  2308. }
  2309. int zookeeper_process(zhandle_t *zh, int events)
  2310. {
  2311. buffer_list_t *bptr;
  2312. int rc;
  2313. if (zh==NULL)
  2314. return ZBADARGUMENTS;
  2315. if (is_unrecoverable(zh))
  2316. return ZINVALIDSTATE;
  2317. api_prolog(zh);
  2318. IF_DEBUG(checkResponseLatency(zh));
  2319. rc = check_events(zh, events);
  2320. if (rc!=ZOK)
  2321. return api_epilog(zh, rc);
  2322. IF_DEBUG(isSocketReadable(zh));
  2323. while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
  2324. struct ReplyHeader hdr;
  2325. struct iarchive *ia = create_buffer_iarchive(
  2326. bptr->buffer, bptr->curr_offset);
  2327. deserialize_ReplyHeader(ia, "hdr", &hdr);
  2328. if (hdr.zxid > 0) {
  2329. zh->last_zxid = hdr.zxid;
  2330. } else {
  2331. // fprintf(stderr, "Got %#x for %#x\n", hdr.zxid, hdr.xid);
  2332. }
  2333. if (hdr.xid == WATCHER_EVENT_XID) {
  2334. struct WatcherEvent evt;
  2335. int type = 0;
  2336. char *path = NULL;
  2337. completion_list_t *c = NULL;
  2338. LOG_DEBUG(("Processing WATCHER_EVENT"));
  2339. deserialize_WatcherEvent(ia, "event", &evt);
  2340. type = evt.type;
  2341. path = evt.path;
  2342. /* We are doing a notification, so there is no pending request */
  2343. c = create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0,0);
  2344. c->buffer = bptr;
  2345. c->c.watcher_result = collectWatchers(zh, type, path);
  2346. // We cannot free until now, otherwise path will become invalid
  2347. deallocate_WatcherEvent(&evt);
  2348. queue_completion(&zh->completions_to_process, c, 0);
  2349. } else if (hdr.xid == SET_WATCHES_XID) {
  2350. LOG_DEBUG(("Processing SET_WATCHES"));
  2351. free_buffer(bptr);
  2352. } else if (hdr.xid == AUTH_XID){
  2353. LOG_DEBUG(("Processing AUTH_XID"));
  2354. /* special handling for the AUTH response as it may come back
  2355. * out-of-band */
  2356. auth_completion_func(hdr.err,zh);
  2357. free_buffer(bptr);
  2358. /* authentication completion may change the connection state to
  2359. * unrecoverable */
  2360. if(is_unrecoverable(zh)){
  2361. handle_error(zh, ZAUTHFAILED);
  2362. close_buffer_iarchive(&ia);
  2363. return api_epilog(zh, ZAUTHFAILED);
  2364. }
  2365. } else {
  2366. int rc = hdr.err;
  2367. /* Find the request corresponding to the response */
  2368. completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
  2369. /* [ZOOKEEPER-804] Don't assert if zookeeper_close has been called. */
  2370. if (zh->close_requested == 1 && cptr == NULL) {
  2371. LOG_DEBUG(("Completion queue has been cleared by zookeeper_close()"));
  2372. close_buffer_iarchive(&ia);
  2373. return api_epilog(zh,ZINVALIDSTATE);
  2374. }
  2375. assert(cptr);
  2376. /* The requests are going to come back in order */
  2377. if (cptr->xid != hdr.xid) {
  2378. LOG_DEBUG(("Processing unexpected or out-of-order response!"));
  2379. // received unexpected (or out-of-order) response
  2380. close_buffer_iarchive(&ia);
  2381. free_buffer(bptr);
  2382. // put the completion back on the queue (so it gets properly
  2383. // signaled and deallocated) and disconnect from the server
  2384. queue_completion(&zh->sent_requests,cptr,1);
  2385. return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
  2386. "unexpected server response: expected %#x, but received %#x",
  2387. hdr.xid,cptr->xid);
  2388. }
  2389. activateWatcher(zh, cptr->watcher, rc);
  2390. if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
  2391. if(hdr.xid == PING_XID){
  2392. int elapsed = 0;
  2393. struct timeval now;
  2394. gettimeofday(&now, 0);
  2395. elapsed = calculate_interval(&zh->last_ping, &now);
  2396. LOG_DEBUG(("Got ping response in %d ms", elapsed));
  2397. // Nothing to do with a ping response
  2398. free_buffer(bptr);
  2399. destroy_completion_entry(cptr);
  2400. } else {
  2401. LOG_DEBUG(("Queueing asynchronous response"));
  2402. cptr->buffer = bptr;
  2403. queue_completion(&zh->completions_to_process, cptr, 0);
  2404. }
  2405. } else {
  2406. struct sync_completion
  2407. *sc = (struct sync_completion*)cptr->data;
  2408. sc->rc = rc;
  2409. process_sync_completion(cptr, sc, ia, zh);
  2410. notify_sync_completion(sc);
  2411. free_buffer(bptr);
  2412. zh->outstanding_sync--;
  2413. destroy_completion_entry(cptr);
  2414. }
  2415. }
  2416. close_buffer_iarchive(&ia);
  2417. }
  2418. if (process_async(zh->outstanding_sync)) {
  2419. process_completions(zh);
  2420. }
  2421. return api_epilog(zh,ZOK);}
  2422. int zoo_state(zhandle_t *zh)
  2423. {
  2424. if(zh!=0)
  2425. return zh->state;
  2426. return 0;
  2427. }
  2428. static watcher_registration_t* create_watcher_registration(const char* path,
  2429. result_checker_fn checker,watcher_fn watcher,void* ctx){
  2430. watcher_registration_t* wo;
  2431. if(watcher==0)
  2432. return 0;
  2433. wo=calloc(1,sizeof(watcher_registration_t));
  2434. wo->path=strdup(path);
  2435. wo->watcher=watcher;
  2436. wo->context=ctx;
  2437. wo->checker=checker;
  2438. return wo;
  2439. }
  2440. static void destroy_watcher_registration(watcher_registration_t* wo){
  2441. if(wo!=0){
  2442. free((void*)wo->path);
  2443. free(wo);
  2444. }
  2445. }
  2446. static completion_list_t* create_completion_entry(int xid, int completion_type,
  2447. const void *dc, const void *data,watcher_registration_t* wo, completion_head_t *clist)
  2448. {
  2449. completion_list_t *c = calloc(1,sizeof(completion_list_t));
  2450. if (!c) {
  2451. LOG_ERROR(("out of memory"));
  2452. return 0;
  2453. }
  2454. c->c.type = completion_type;
  2455. c->data = data;
  2456. switch(c->c.type) {
  2457. case COMPLETION_VOID:
  2458. c->c.void_result = (void_completion_t)dc;
  2459. break;
  2460. case COMPLETION_STRING:
  2461. c->c.string_result = (string_completion_t)dc;
  2462. break;
  2463. case COMPLETION_DATA:
  2464. c->c.data_result = (data_completion_t)dc;
  2465. break;
  2466. case COMPLETION_STAT:
  2467. c->c.stat_result = (stat_completion_t)dc;
  2468. break;
  2469. case COMPLETION_STRINGLIST:
  2470. c->c.strings_result = (strings_completion_t)dc;
  2471. break;
  2472. case COMPLETION_STRINGLIST_STAT:
  2473. c->c.strings_stat_result = (strings_stat_completion_t)dc;
  2474. break;
  2475. case COMPLETION_STRING_STAT:
  2476. c->c.string_stat_result = (string_stat_completion_t)dc;
  2477. case COMPLETION_ACLLIST:
  2478. c->c.acl_result = (acl_completion_t)dc;
  2479. break;
  2480. case COMPLETION_MULTI:
  2481. assert(clist);
  2482. c->c.void_result = (void_completion_t)dc;
  2483. c->c.clist = *clist;
  2484. break;
  2485. }
  2486. c->xid = xid;
  2487. c->watcher = wo;
  2488. return c;
  2489. }
  2490. static void destroy_completion_entry(completion_list_t* c){
  2491. if(c!=0){
  2492. destroy_watcher_registration(c->watcher);
  2493. if(c->buffer!=0)
  2494. free_buffer(c->buffer);
  2495. free(c);
  2496. }
  2497. }
  2498. static void queue_completion_nolock(completion_head_t *list,
  2499. completion_list_t *c,
  2500. int add_to_front)
  2501. {
  2502. c->next = 0;
  2503. /* appending a new entry to the back of the list */
  2504. if (list->last) {
  2505. assert(list->head);
  2506. // List is not empty
  2507. if (!add_to_front) {
  2508. list->last->next = c;
  2509. list->last = c;
  2510. } else {
  2511. c->next = list->head;
  2512. list->head = c;
  2513. }
  2514. } else {
  2515. // List is empty
  2516. assert(!list->head);
  2517. list->head = c;
  2518. list->last = c;
  2519. }
  2520. }
  2521. static void queue_completion(completion_head_t *list, completion_list_t *c,
  2522. int add_to_front)
  2523. {
  2524. lock_completion_list(list);
  2525. queue_completion_nolock(list, c, add_to_front);
  2526. unlock_completion_list(list);
  2527. }
  2528. static int add_completion(zhandle_t *zh, int xid, int completion_type,
  2529. const void *dc, const void *data, int add_to_front,
  2530. watcher_registration_t* wo, completion_head_t *clist)
  2531. {
  2532. completion_list_t *c =create_completion_entry(xid, completion_type, dc,
  2533. data, wo, clist);
  2534. int rc = 0;
  2535. if (!c)
  2536. return ZSYSTEMERROR;
  2537. lock_completion_list(&zh->sent_requests);
  2538. if (zh->close_requested != 1) {
  2539. queue_completion_nolock(&zh->sent_requests, c, add_to_front);
  2540. if (dc == SYNCHRONOUS_MARKER) {
  2541. zh->outstanding_sync++;
  2542. }
  2543. rc = ZOK;
  2544. } else {
  2545. free(c);
  2546. rc = ZINVALIDSTATE;
  2547. }
  2548. unlock_completion_list(&zh->sent_requests);
  2549. return rc;
  2550. }
  2551. static int add_data_completion(zhandle_t *zh, int xid, data_completion_t dc,
  2552. const void *data,watcher_registration_t* wo)
  2553. {
  2554. return add_completion(zh, xid, COMPLETION_DATA, dc, data, 0, wo, 0);
  2555. }
  2556. static int add_stat_completion(zhandle_t *zh, int xid, stat_completion_t dc,
  2557. const void *data,watcher_registration_t* wo)
  2558. {
  2559. return add_completion(zh, xid, COMPLETION_STAT, dc, data, 0, wo, 0);
  2560. }
  2561. static int add_strings_completion(zhandle_t *zh, int xid,
  2562. strings_completion_t dc, const void *data,watcher_registration_t* wo)
  2563. {
  2564. return add_completion(zh, xid, COMPLETION_STRINGLIST, dc, data, 0, wo, 0);
  2565. }
  2566. static int add_strings_stat_completion(zhandle_t *zh, int xid,
  2567. strings_stat_completion_t dc, const void *data,watcher_registration_t* wo)
  2568. {
  2569. return add_completion(zh, xid, COMPLETION_STRINGLIST_STAT, dc, data, 0, wo, 0);
  2570. }
  2571. static int add_acl_completion(zhandle_t *zh, int xid, acl_completion_t dc,
  2572. const void *data)
  2573. {
  2574. return add_completion(zh, xid, COMPLETION_ACLLIST, dc, data, 0, 0, 0);
  2575. }
  2576. static int add_void_completion(zhandle_t *zh, int xid, void_completion_t dc,
  2577. const void *data)
  2578. {
  2579. return add_completion(zh, xid, COMPLETION_VOID, dc, data, 0, 0, 0);
  2580. }
  2581. static int add_string_completion(zhandle_t *zh, int xid,
  2582. string_completion_t dc, const void *data)
  2583. {
  2584. return add_completion(zh, xid, COMPLETION_STRING, dc, data, 0, 0, 0);
  2585. }
  2586. static int add_string_stat_completion(zhandle_t *zh, int xid,
  2587. string_stat_completion_t dc, const void *data)
  2588. {
  2589. return add_completion(zh, xid, COMPLETION_STRING_STAT, dc, data, 0, 0, 0);
  2590. }
  2591. static int add_multi_completion(zhandle_t *zh, int xid, void_completion_t dc,
  2592. const void *data, completion_head_t *clist)
  2593. {
  2594. return add_completion(zh, xid, COMPLETION_MULTI, dc, data, 0,0, clist);
  2595. }
  2596. int zookeeper_close(zhandle_t *zh)
  2597. {
  2598. int rc=ZOK;
  2599. if (zh==0)
  2600. return ZBADARGUMENTS;
  2601. zh->close_requested=1;
  2602. if (inc_ref_counter(zh,1)>1) {
  2603. /* We have incremented the ref counter to prevent the
  2604. * completions from calling zookeeper_close before we have
  2605. * completed the adaptor_finish call below. */
  2606. /* Signal any syncronous completions before joining the threads */
  2607. enter_critical(zh);
  2608. free_completions(zh,1,ZCLOSING);
  2609. leave_critical(zh);
  2610. adaptor_finish(zh);
  2611. /* Now we can allow the handle to be cleaned up, if the completion
  2612. * threads finished during the adaptor_finish call. */
  2613. api_epilog(zh, 0);
  2614. return ZOK;
  2615. }
  2616. /* No need to decrement the counter since we're just going to
  2617. * destroy the handle later. */
  2618. if(zh->state==ZOO_CONNECTED_STATE){
  2619. struct oarchive *oa;
  2620. struct RequestHeader h = {get_xid(), ZOO_CLOSE_OP};
  2621. LOG_INFO(("Closing zookeeper sessionId=%#llx to [%s]\n",
  2622. zh->client_id.client_id,zoo_get_current_server(zh)));
  2623. oa = create_buffer_oarchive();
  2624. rc = serialize_RequestHeader(oa, "header", &h);
  2625. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2626. get_buffer_len(oa));
  2627. /* We queued the buffer, so don't free it */
  2628. close_buffer_oarchive(&oa, 0);
  2629. if (rc < 0) {
  2630. rc = ZMARSHALLINGERROR;
  2631. goto finish;
  2632. }
  2633. /* make sure the close request is sent; we set timeout to an arbitrary
  2634. * (but reasonable) number of milliseconds since we want the call to block*/
  2635. rc=adaptor_send_queue(zh, 3000);
  2636. }else{
  2637. LOG_INFO(("Freeing zookeeper resources for sessionId=%#llx\n",
  2638. zh->client_id.client_id));
  2639. rc = ZOK;
  2640. }
  2641. finish:
  2642. destroy(zh);
  2643. adaptor_destroy(zh);
  2644. free(zh);
  2645. #ifdef WIN32
  2646. Win32WSACleanup();
  2647. #endif
  2648. return rc;
  2649. }
  2650. static int isValidPath(const char* path, const int flags) {
  2651. int len = 0;
  2652. char lastc = '/';
  2653. char c;
  2654. int i = 0;
  2655. if (path == 0)
  2656. return 0;
  2657. len = strlen(path);
  2658. if (len == 0)
  2659. return 0;
  2660. if (path[0] != '/')
  2661. return 0;
  2662. if (len == 1) // done checking - it's the root
  2663. return 1;
  2664. if (path[len - 1] == '/' && !(flags & ZOO_SEQUENCE))
  2665. return 0;
  2666. i = 1;
  2667. for (; i < len; lastc = path[i], i++) {
  2668. c = path[i];
  2669. if (c == 0) {
  2670. return 0;
  2671. } else if (c == '/' && lastc == '/') {
  2672. return 0;
  2673. } else if (c == '.' && lastc == '.') {
  2674. if (path[i-2] == '/' && (((i + 1 == len) && !(flags & ZOO_SEQUENCE))
  2675. || path[i+1] == '/')) {
  2676. return 0;
  2677. }
  2678. } else if (c == '.') {
  2679. if ((path[i-1] == '/') && (((i + 1 == len) && !(flags & ZOO_SEQUENCE))
  2680. || path[i+1] == '/')) {
  2681. return 0;
  2682. }
  2683. } else if (c > 0x00 && c < 0x1f) {
  2684. return 0;
  2685. }
  2686. }
  2687. return 1;
  2688. }
  2689. /*---------------------------------------------------------------------------*
  2690. * REQUEST INIT HELPERS
  2691. *---------------------------------------------------------------------------*/
  2692. /* Common Request init helper functions to reduce code duplication */
  2693. static int Request_path_init(zhandle_t *zh, int flags,
  2694. char **path_out, const char *path)
  2695. {
  2696. assert(path_out);
  2697. *path_out = prepend_string(zh, path);
  2698. if (zh == NULL || !isValidPath(*path_out, flags)) {
  2699. free_duplicate_path(*path_out, path);
  2700. return ZBADARGUMENTS;
  2701. }
  2702. if (is_unrecoverable(zh)) {
  2703. free_duplicate_path(*path_out, path);
  2704. return ZINVALIDSTATE;
  2705. }
  2706. return ZOK;
  2707. }
  2708. static int Request_path_watch_init(zhandle_t *zh, int flags,
  2709. char **path_out, const char *path,
  2710. int32_t *watch_out, uint32_t watch)
  2711. {
  2712. int rc = Request_path_init(zh, flags, path_out, path);
  2713. if (rc != ZOK) {
  2714. return rc;
  2715. }
  2716. *watch_out = watch;
  2717. return ZOK;
  2718. }
  2719. /*---------------------------------------------------------------------------*
  2720. * ASYNC API
  2721. *---------------------------------------------------------------------------*/
  2722. int zoo_aget(zhandle_t *zh, const char *path, int watch, data_completion_t dc,
  2723. const void *data)
  2724. {
  2725. return zoo_awget(zh,path,watch?zh->watcher:0,zh->context,dc,data);
  2726. }
  2727. int zoo_awget(zhandle_t *zh, const char *path,
  2728. watcher_fn watcher, void* watcherCtx,
  2729. data_completion_t dc, const void *data)
  2730. {
  2731. struct oarchive *oa;
  2732. char *server_path = prepend_string(zh, path);
  2733. struct RequestHeader h = {get_xid(), ZOO_GETDATA_OP};
  2734. struct GetDataRequest req = { (char*)server_path, watcher!=0 };
  2735. int rc;
  2736. if (zh==0 || !isValidPath(server_path, 0)) {
  2737. free_duplicate_path(server_path, path);
  2738. return ZBADARGUMENTS;
  2739. }
  2740. if (is_unrecoverable(zh)) {
  2741. free_duplicate_path(server_path, path);
  2742. return ZINVALIDSTATE;
  2743. }
  2744. oa=create_buffer_oarchive();
  2745. rc = serialize_RequestHeader(oa, "header", &h);
  2746. rc = rc < 0 ? rc : serialize_GetDataRequest(oa, "req", &req);
  2747. enter_critical(zh);
  2748. rc = rc < 0 ? rc : add_data_completion(zh, h.xid, dc, data,
  2749. create_watcher_registration(server_path,data_result_checker,watcher,watcherCtx));
  2750. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2751. get_buffer_len(oa));
  2752. leave_critical(zh);
  2753. free_duplicate_path(server_path, path);
  2754. /* We queued the buffer, so don't free it */
  2755. close_buffer_oarchive(&oa, 0);
  2756. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  2757. zoo_get_current_server(zh)));
  2758. /* make a best (non-blocking) effort to send the requests asap */
  2759. adaptor_send_queue(zh, 0);
  2760. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  2761. }
  2762. static int SetDataRequest_init(zhandle_t *zh, struct SetDataRequest *req,
  2763. const char *path, const char *buffer, int buflen, int version)
  2764. {
  2765. int rc;
  2766. assert(req);
  2767. rc = Request_path_init(zh, 0, &req->path, path);
  2768. if (rc != ZOK) {
  2769. return rc;
  2770. }
  2771. req->data.buff = (char*)buffer;
  2772. req->data.len = buflen;
  2773. req->version = version;
  2774. return ZOK;
  2775. }
  2776. int zoo_aset(zhandle_t *zh, const char *path, const char *buffer, int buflen,
  2777. int version, stat_completion_t dc, const void *data)
  2778. {
  2779. struct oarchive *oa;
  2780. struct RequestHeader h = {get_xid(), ZOO_SETDATA_OP};
  2781. struct SetDataRequest req;
  2782. int rc = SetDataRequest_init(zh, &req, path, buffer, buflen, version);
  2783. if (rc != ZOK) {
  2784. return rc;
  2785. }
  2786. oa = create_buffer_oarchive();
  2787. rc = serialize_RequestHeader(oa, "header", &h);
  2788. rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
  2789. enter_critical(zh);
  2790. rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, dc, data,0);
  2791. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2792. get_buffer_len(oa));
  2793. leave_critical(zh);
  2794. free_duplicate_path(req.path, path);
  2795. /* We queued the buffer, so don't free it */
  2796. close_buffer_oarchive(&oa, 0);
  2797. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  2798. zoo_get_current_server(zh)));
  2799. /* make a best (non-blocking) effort to send the requests asap */
  2800. adaptor_send_queue(zh, 0);
  2801. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  2802. }
  2803. static int CreateRequest_init(zhandle_t *zh, struct CreateRequest *req,
  2804. const char *path, const char *value,
  2805. int valuelen, const struct ACL_vector *acl_entries, int flags)
  2806. {
  2807. int rc;
  2808. assert(req);
  2809. rc = Request_path_init(zh, flags, &req->path, path);
  2810. assert(req);
  2811. if (rc != ZOK) {
  2812. return rc;
  2813. }
  2814. req->flags = flags;
  2815. req->data.buff = (char*)value;
  2816. req->data.len = valuelen;
  2817. if (acl_entries == 0) {
  2818. req->acl.count = 0;
  2819. req->acl.data = 0;
  2820. } else {
  2821. req->acl = *acl_entries;
  2822. }
  2823. return ZOK;
  2824. }
  2825. static int Create2Request_init(zhandle_t *zh, struct Create2Request *req,
  2826. const char *path, const char *value,
  2827. int valuelen, const struct ACL_vector *acl_entries, int flags)
  2828. {
  2829. int rc;
  2830. assert(req);
  2831. rc = Request_path_init(zh, flags, &req->path, path);
  2832. assert(req);
  2833. if (rc != ZOK) {
  2834. return rc;
  2835. }
  2836. req->flags = flags;
  2837. req->data.buff = (char*)value;
  2838. req->data.len = valuelen;
  2839. if (acl_entries == 0) {
  2840. req->acl.count = 0;
  2841. req->acl.data = 0;
  2842. } else {
  2843. req->acl = *acl_entries;
  2844. }
  2845. return ZOK;
  2846. }
  2847. int zoo_acreate(zhandle_t *zh, const char *path, const char *value,
  2848. int valuelen, const struct ACL_vector *acl_entries, int flags,
  2849. string_completion_t completion, const void *data)
  2850. {
  2851. struct oarchive *oa;
  2852. struct RequestHeader h = {get_xid(), ZOO_CREATE_OP};
  2853. struct CreateRequest req;
  2854. int rc = CreateRequest_init(zh, &req,
  2855. path, value, valuelen, acl_entries, flags);
  2856. if (rc != ZOK) {
  2857. return rc;
  2858. }
  2859. oa = create_buffer_oarchive();
  2860. rc = serialize_RequestHeader(oa, "header", &h);
  2861. rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
  2862. enter_critical(zh);
  2863. rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
  2864. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2865. get_buffer_len(oa));
  2866. leave_critical(zh);
  2867. free_duplicate_path(req.path, path);
  2868. /* We queued the buffer, so don't free it */
  2869. close_buffer_oarchive(&oa, 0);
  2870. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  2871. zoo_get_current_server(zh)));
  2872. /* make a best (non-blocking) effort to send the requests asap */
  2873. adaptor_send_queue(zh, 0);
  2874. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  2875. }
  2876. int zoo_acreate2(zhandle_t *zh, const char *path, const char *value,
  2877. int valuelen, const struct ACL_vector *acl_entries, int flags,
  2878. string_stat_completion_t completion, const void *data)
  2879. {
  2880. struct oarchive *oa;
  2881. struct RequestHeader h = { get_xid(), ZOO_CREATE2_OP };
  2882. struct Create2Request req;
  2883. int rc = Create2Request_init(zh, &req, path, value, valuelen, acl_entries, flags);
  2884. if (rc != ZOK) {
  2885. return rc;
  2886. }
  2887. oa = create_buffer_oarchive();
  2888. rc = serialize_RequestHeader(oa, "header", &h);
  2889. rc = rc < 0 ? rc : serialize_Create2Request(oa, "req", &req);
  2890. enter_critical(zh);
  2891. rc = rc < 0 ? rc : add_string_stat_completion(zh, h.xid, completion, data);
  2892. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2893. get_buffer_len(oa));
  2894. leave_critical(zh);
  2895. free_duplicate_path(req.path, path);
  2896. /* We queued the buffer, so don't free it */
  2897. close_buffer_oarchive(&oa, 0);
  2898. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  2899. zoo_get_current_server(zh)));
  2900. /* make a best (non-blocking) effort to send the requests asap */
  2901. adaptor_send_queue(zh, 0);
  2902. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  2903. }
  2904. int DeleteRequest_init(zhandle_t *zh, struct DeleteRequest *req,
  2905. const char *path, int version)
  2906. {
  2907. int rc = Request_path_init(zh, 0, &req->path, path);
  2908. if (rc != ZOK) {
  2909. return rc;
  2910. }
  2911. req->version = version;
  2912. return ZOK;
  2913. }
  2914. int zoo_adelete(zhandle_t *zh, const char *path, int version,
  2915. void_completion_t completion, const void *data)
  2916. {
  2917. struct oarchive *oa;
  2918. struct RequestHeader h = {get_xid(), ZOO_DELETE_OP};
  2919. struct DeleteRequest req;
  2920. int rc = DeleteRequest_init(zh, &req, path, version);
  2921. if (rc != ZOK) {
  2922. return rc;
  2923. }
  2924. oa = create_buffer_oarchive();
  2925. rc = serialize_RequestHeader(oa, "header", &h);
  2926. rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
  2927. enter_critical(zh);
  2928. rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
  2929. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2930. get_buffer_len(oa));
  2931. leave_critical(zh);
  2932. free_duplicate_path(req.path, path);
  2933. /* We queued the buffer, so don't free it */
  2934. close_buffer_oarchive(&oa, 0);
  2935. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  2936. zoo_get_current_server(zh)));
  2937. /* make a best (non-blocking) effort to send the requests asap */
  2938. adaptor_send_queue(zh, 0);
  2939. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  2940. }
  2941. int zoo_aexists(zhandle_t *zh, const char *path, int watch,
  2942. stat_completion_t sc, const void *data)
  2943. {
  2944. return zoo_awexists(zh,path,watch?zh->watcher:0,zh->context,sc,data);
  2945. }
  2946. int zoo_awexists(zhandle_t *zh, const char *path,
  2947. watcher_fn watcher, void* watcherCtx,
  2948. stat_completion_t completion, const void *data)
  2949. {
  2950. struct oarchive *oa;
  2951. struct RequestHeader h = {get_xid(), ZOO_EXISTS_OP};
  2952. struct ExistsRequest req;
  2953. int rc = Request_path_watch_init(zh, 0, &req.path, path,
  2954. &req.watch, watcher != NULL);
  2955. if (rc != ZOK) {
  2956. return rc;
  2957. }
  2958. oa = create_buffer_oarchive();
  2959. rc = serialize_RequestHeader(oa, "header", &h);
  2960. rc = rc < 0 ? rc : serialize_ExistsRequest(oa, "req", &req);
  2961. enter_critical(zh);
  2962. rc = rc < 0 ? rc : add_stat_completion(zh, h.xid, completion, data,
  2963. create_watcher_registration(req.path,exists_result_checker,
  2964. watcher,watcherCtx));
  2965. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2966. get_buffer_len(oa));
  2967. leave_critical(zh);
  2968. free_duplicate_path(req.path, path);
  2969. /* We queued the buffer, so don't free it */
  2970. close_buffer_oarchive(&oa, 0);
  2971. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  2972. zoo_get_current_server(zh)));
  2973. /* make a best (non-blocking) effort to send the requests asap */
  2974. adaptor_send_queue(zh, 0);
  2975. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  2976. }
  2977. static int zoo_awget_children_(zhandle_t *zh, const char *path,
  2978. watcher_fn watcher, void* watcherCtx,
  2979. strings_completion_t sc,
  2980. const void *data)
  2981. {
  2982. struct oarchive *oa;
  2983. struct RequestHeader h = {get_xid(), ZOO_GETCHILDREN_OP};
  2984. struct GetChildrenRequest req ;
  2985. int rc = Request_path_watch_init(zh, 0, &req.path, path,
  2986. &req.watch, watcher != NULL);
  2987. if (rc != ZOK) {
  2988. return rc;
  2989. }
  2990. oa = create_buffer_oarchive();
  2991. rc = serialize_RequestHeader(oa, "header", &h);
  2992. rc = rc < 0 ? rc : serialize_GetChildrenRequest(oa, "req", &req);
  2993. enter_critical(zh);
  2994. rc = rc < 0 ? rc : add_strings_completion(zh, h.xid, sc, data,
  2995. create_watcher_registration(req.path,child_result_checker,watcher,watcherCtx));
  2996. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  2997. get_buffer_len(oa));
  2998. leave_critical(zh);
  2999. free_duplicate_path(req.path, path);
  3000. /* We queued the buffer, so don't free it */
  3001. close_buffer_oarchive(&oa, 0);
  3002. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  3003. zoo_get_current_server(zh)));
  3004. /* make a best (non-blocking) effort to send the requests asap */
  3005. adaptor_send_queue(zh, 0);
  3006. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  3007. }
  3008. int zoo_aget_children(zhandle_t *zh, const char *path, int watch,
  3009. strings_completion_t dc, const void *data)
  3010. {
  3011. return zoo_awget_children_(zh,path,watch?zh->watcher:0,zh->context,dc,data);
  3012. }
  3013. int zoo_awget_children(zhandle_t *zh, const char *path,
  3014. watcher_fn watcher, void* watcherCtx,
  3015. strings_completion_t dc,
  3016. const void *data)
  3017. {
  3018. return zoo_awget_children_(zh,path,watcher,watcherCtx,dc,data);
  3019. }
  3020. static int zoo_awget_children2_(zhandle_t *zh, const char *path,
  3021. watcher_fn watcher, void* watcherCtx,
  3022. strings_stat_completion_t ssc,
  3023. const void *data)
  3024. {
  3025. /* invariant: (sc == NULL) != (sc == NULL) */
  3026. struct oarchive *oa;
  3027. struct RequestHeader h = {get_xid(), ZOO_GETCHILDREN2_OP};
  3028. struct GetChildren2Request req ;
  3029. int rc = Request_path_watch_init(zh, 0, &req.path, path,
  3030. &req.watch, watcher != NULL);
  3031. if (rc != ZOK) {
  3032. return rc;
  3033. }
  3034. oa = create_buffer_oarchive();
  3035. rc = serialize_RequestHeader(oa, "header", &h);
  3036. rc = rc < 0 ? rc : serialize_GetChildren2Request(oa, "req", &req);
  3037. enter_critical(zh);
  3038. rc = rc < 0 ? rc : add_strings_stat_completion(zh, h.xid, ssc, data,
  3039. create_watcher_registration(req.path,child_result_checker,watcher,watcherCtx));
  3040. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  3041. get_buffer_len(oa));
  3042. leave_critical(zh);
  3043. free_duplicate_path(req.path, path);
  3044. /* We queued the buffer, so don't free it */
  3045. close_buffer_oarchive(&oa, 0);
  3046. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  3047. zoo_get_current_server(zh)));
  3048. /* make a best (non-blocking) effort to send the requests asap */
  3049. adaptor_send_queue(zh, 0);
  3050. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  3051. }
  3052. int zoo_aget_children2(zhandle_t *zh, const char *path, int watch,
  3053. strings_stat_completion_t dc, const void *data)
  3054. {
  3055. return zoo_awget_children2_(zh,path,watch?zh->watcher:0,zh->context,dc,data);
  3056. }
  3057. int zoo_awget_children2(zhandle_t *zh, const char *path,
  3058. watcher_fn watcher, void* watcherCtx,
  3059. strings_stat_completion_t dc,
  3060. const void *data)
  3061. {
  3062. return zoo_awget_children2_(zh,path,watcher,watcherCtx,dc,data);
  3063. }
  3064. int zoo_async(zhandle_t *zh, const char *path,
  3065. string_completion_t completion, const void *data)
  3066. {
  3067. struct oarchive *oa;
  3068. struct RequestHeader h = {get_xid(), ZOO_SYNC_OP};
  3069. struct SyncRequest req;
  3070. int rc = Request_path_init(zh, 0, &req.path, path);
  3071. if (rc != ZOK) {
  3072. return rc;
  3073. }
  3074. oa = create_buffer_oarchive();
  3075. rc = serialize_RequestHeader(oa, "header", &h);
  3076. rc = rc < 0 ? rc : serialize_SyncRequest(oa, "req", &req);
  3077. enter_critical(zh);
  3078. rc = rc < 0 ? rc : add_string_completion(zh, h.xid, completion, data);
  3079. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  3080. get_buffer_len(oa));
  3081. leave_critical(zh);
  3082. free_duplicate_path(req.path, path);
  3083. /* We queued the buffer, so don't free it */
  3084. close_buffer_oarchive(&oa, 0);
  3085. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  3086. zoo_get_current_server(zh)));
  3087. /* make a best (non-blocking) effort to send the requests asap */
  3088. adaptor_send_queue(zh, 0);
  3089. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  3090. }
  3091. int zoo_aget_acl(zhandle_t *zh, const char *path, acl_completion_t completion,
  3092. const void *data)
  3093. {
  3094. struct oarchive *oa;
  3095. struct RequestHeader h = {get_xid(), ZOO_GETACL_OP};
  3096. struct GetACLRequest req;
  3097. int rc = Request_path_init(zh, 0, &req.path, path) ;
  3098. if (rc != ZOK) {
  3099. return rc;
  3100. }
  3101. oa = create_buffer_oarchive();
  3102. rc = serialize_RequestHeader(oa, "header", &h);
  3103. rc = rc < 0 ? rc : serialize_GetACLRequest(oa, "req", &req);
  3104. enter_critical(zh);
  3105. rc = rc < 0 ? rc : add_acl_completion(zh, h.xid, completion, data);
  3106. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  3107. get_buffer_len(oa));
  3108. leave_critical(zh);
  3109. free_duplicate_path(req.path, path);
  3110. /* We queued the buffer, so don't free it */
  3111. close_buffer_oarchive(&oa, 0);
  3112. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  3113. zoo_get_current_server(zh)));
  3114. /* make a best (non-blocking) effort to send the requests asap */
  3115. adaptor_send_queue(zh, 0);
  3116. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  3117. }
  3118. int zoo_aset_acl(zhandle_t *zh, const char *path, int version,
  3119. struct ACL_vector *acl, void_completion_t completion, const void *data)
  3120. {
  3121. struct oarchive *oa;
  3122. struct RequestHeader h = {get_xid(), ZOO_SETACL_OP};
  3123. struct SetACLRequest req;
  3124. int rc = Request_path_init(zh, 0, &req.path, path);
  3125. if (rc != ZOK) {
  3126. return rc;
  3127. }
  3128. oa = create_buffer_oarchive();
  3129. req.acl = *acl;
  3130. req.version = version;
  3131. rc = serialize_RequestHeader(oa, "header", &h);
  3132. rc = rc < 0 ? rc : serialize_SetACLRequest(oa, "req", &req);
  3133. enter_critical(zh);
  3134. rc = rc < 0 ? rc : add_void_completion(zh, h.xid, completion, data);
  3135. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  3136. get_buffer_len(oa));
  3137. leave_critical(zh);
  3138. free_duplicate_path(req.path, path);
  3139. /* We queued the buffer, so don't free it */
  3140. close_buffer_oarchive(&oa, 0);
  3141. LOG_DEBUG(("Sending request xid=%#x for path [%s] to %s",h.xid,path,
  3142. zoo_get_current_server(zh)));
  3143. /* make a best (non-blocking) effort to send the requests asap */
  3144. adaptor_send_queue(zh, 0);
  3145. return (rc < 0)?ZMARSHALLINGERROR:ZOK;
  3146. }
  3147. /* Completions for multi-op results */
  3148. static void op_result_string_completion(int err, const char *value, const void *data)
  3149. {
  3150. struct zoo_op_result *result = (struct zoo_op_result *)data;
  3151. assert(result);
  3152. result->err = err;
  3153. if (result->value && value) {
  3154. int len = strlen(value) + 1;
  3155. if (len > result->valuelen) {
  3156. len = result->valuelen;
  3157. }
  3158. if (len > 0) {
  3159. memcpy(result->value, value, len - 1);
  3160. result->value[len - 1] = '\0';
  3161. }
  3162. } else {
  3163. result->value = NULL;
  3164. }
  3165. }
  3166. static void op_result_void_completion(int err, const void *data)
  3167. {
  3168. struct zoo_op_result *result = (struct zoo_op_result *)data;
  3169. assert(result);
  3170. result->err = err;
  3171. }
  3172. static void op_result_stat_completion(int err, const struct Stat *stat, const void *data)
  3173. {
  3174. struct zoo_op_result *result = (struct zoo_op_result *)data;
  3175. assert(result);
  3176. result->err = err;
  3177. if (result->stat && err == 0 && stat) {
  3178. *result->stat = *stat;
  3179. } else {
  3180. result->stat = NULL ;
  3181. }
  3182. }
  3183. static int CheckVersionRequest_init(zhandle_t *zh, struct CheckVersionRequest *req,
  3184. const char *path, int version)
  3185. {
  3186. int rc ;
  3187. assert(req);
  3188. rc = Request_path_init(zh, 0, &req->path, path);
  3189. if (rc != ZOK) {
  3190. return rc;
  3191. }
  3192. req->version = version;
  3193. return ZOK;
  3194. }
  3195. int zoo_amulti(zhandle_t *zh, int count, const zoo_op_t *ops,
  3196. zoo_op_result_t *results, void_completion_t completion, const void *data)
  3197. {
  3198. struct RequestHeader h = {get_xid(), ZOO_MULTI_OP};
  3199. struct MultiHeader mh = {-1, 1, -1};
  3200. struct oarchive *oa = create_buffer_oarchive();
  3201. completion_head_t clist = { 0 };
  3202. int rc = serialize_RequestHeader(oa, "header", &h);
  3203. int index = 0;
  3204. for (index=0; index < count; index++) {
  3205. const zoo_op_t *op = ops+index;
  3206. zoo_op_result_t *result = results+index;
  3207. completion_list_t *entry = NULL;
  3208. struct MultiHeader mh = {op->type, 0, -1};
  3209. rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
  3210. switch(op->type) {
  3211. case ZOO_CREATE_OP: {
  3212. struct CreateRequest req;
  3213. rc = rc < 0 ? rc : CreateRequest_init(zh, &req,
  3214. op->create_op.path, op->create_op.data,
  3215. op->create_op.datalen, op->create_op.acl,
  3216. op->create_op.flags);
  3217. rc = rc < 0 ? rc : serialize_CreateRequest(oa, "req", &req);
  3218. result->value = op->create_op.buf;
  3219. result->valuelen = op->create_op.buflen;
  3220. enter_critical(zh);
  3221. entry = create_completion_entry(h.xid, COMPLETION_STRING, op_result_string_completion, result, 0, 0);
  3222. leave_critical(zh);
  3223. free_duplicate_path(req.path, op->create_op.path);
  3224. break;
  3225. }
  3226. case ZOO_DELETE_OP: {
  3227. struct DeleteRequest req;
  3228. rc = rc < 0 ? rc : DeleteRequest_init(zh, &req, op->delete_op.path, op->delete_op.version);
  3229. rc = rc < 0 ? rc : serialize_DeleteRequest(oa, "req", &req);
  3230. enter_critical(zh);
  3231. entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
  3232. leave_critical(zh);
  3233. free_duplicate_path(req.path, op->delete_op.path);
  3234. break;
  3235. }
  3236. case ZOO_SETDATA_OP: {
  3237. struct SetDataRequest req;
  3238. rc = rc < 0 ? rc : SetDataRequest_init(zh, &req,
  3239. op->set_op.path, op->set_op.data,
  3240. op->set_op.datalen, op->set_op.version);
  3241. rc = rc < 0 ? rc : serialize_SetDataRequest(oa, "req", &req);
  3242. result->stat = op->set_op.stat;
  3243. enter_critical(zh);
  3244. entry = create_completion_entry(h.xid, COMPLETION_STAT, op_result_stat_completion, result, 0, 0);
  3245. leave_critical(zh);
  3246. free_duplicate_path(req.path, op->set_op.path);
  3247. break;
  3248. }
  3249. case ZOO_CHECK_OP: {
  3250. struct CheckVersionRequest req;
  3251. rc = rc < 0 ? rc : CheckVersionRequest_init(zh, &req,
  3252. op->check_op.path, op->check_op.version);
  3253. rc = rc < 0 ? rc : serialize_CheckVersionRequest(oa, "req", &req);
  3254. enter_critical(zh);
  3255. entry = create_completion_entry(h.xid, COMPLETION_VOID, op_result_void_completion, result, 0, 0);
  3256. leave_critical(zh);
  3257. free_duplicate_path(req.path, op->check_op.path);
  3258. break;
  3259. }
  3260. default:
  3261. LOG_ERROR(("Unimplemented sub-op type=%d in multi-op", op->type));
  3262. return ZUNIMPLEMENTED;
  3263. }
  3264. queue_completion(&clist, entry, 0);
  3265. }
  3266. rc = rc < 0 ? rc : serialize_MultiHeader(oa, "multiheader", &mh);
  3267. /* BEGIN: CRTICIAL SECTION */
  3268. enter_critical(zh);
  3269. rc = rc < 0 ? rc : add_multi_completion(zh, h.xid, completion, data, &clist);
  3270. rc = rc < 0 ? rc : queue_buffer_bytes(&zh->to_send, get_buffer(oa),
  3271. get_buffer_len(oa));
  3272. leave_critical(zh);
  3273. /* We queued the buffer, so don't free it */
  3274. close_buffer_oarchive(&oa, 0);
  3275. LOG_DEBUG(("Sending multi request xid=%#x with %d subrequests to %s",
  3276. h.xid, index, zoo_get_current_server(zh)));
  3277. /* make a best (non-blocking) effort to send the requests asap */
  3278. adaptor_send_queue(zh, 0);
  3279. return (rc < 0) ? ZMARSHALLINGERROR : ZOK;
  3280. }
  3281. void zoo_create_op_init(zoo_op_t *op, const char *path, const char *value,
  3282. int valuelen, const struct ACL_vector *acl, int flags,
  3283. char *path_buffer, int path_buffer_len)
  3284. {
  3285. assert(op);
  3286. op->type = ZOO_CREATE_OP;
  3287. op->create_op.path = path;
  3288. op->create_op.data = value;
  3289. op->create_op.datalen = valuelen;
  3290. op->create_op.acl = acl;
  3291. op->create_op.flags = flags;
  3292. op->create_op.buf = path_buffer;
  3293. op->create_op.buflen = path_buffer_len;
  3294. }
  3295. void zoo_create2_op_init(zoo_op_t *op, const char *path, const char *value,
  3296. int valuelen, const struct ACL_vector *acl, int flags,
  3297. char *path_buffer, int path_buffer_len)
  3298. {
  3299. assert(op);
  3300. op->type = ZOO_CREATE2_OP;
  3301. op->create_op.path = path;
  3302. op->create_op.data = value;
  3303. op->create_op.datalen = valuelen;
  3304. op->create_op.acl = acl;
  3305. op->create_op.flags = flags;
  3306. op->create_op.buf = path_buffer;
  3307. op->create_op.buflen = path_buffer_len;
  3308. }
  3309. void zoo_delete_op_init(zoo_op_t *op, const char *path, int version)
  3310. {
  3311. assert(op);
  3312. op->type = ZOO_DELETE_OP;
  3313. op->delete_op.path = path;
  3314. op->delete_op.version = version;
  3315. }
  3316. void zoo_set_op_init(zoo_op_t *op, const char *path, const char *buffer,
  3317. int buflen, int version, struct Stat *stat)
  3318. {
  3319. assert(op);
  3320. op->type = ZOO_SETDATA_OP;
  3321. op->set_op.path = path;
  3322. op->set_op.data = buffer;
  3323. op->set_op.datalen = buflen;
  3324. op->set_op.version = version;
  3325. op->set_op.stat = stat;
  3326. }
  3327. void zoo_check_op_init(zoo_op_t *op, const char *path, int version)
  3328. {
  3329. assert(op);
  3330. op->type = ZOO_CHECK_OP;
  3331. op->check_op.path = path;
  3332. op->check_op.version = version;
  3333. }
  3334. int zoo_multi(zhandle_t *zh, int count, const zoo_op_t *ops, zoo_op_result_t *results)
  3335. {
  3336. int rc;
  3337. struct sync_completion *sc = alloc_sync_completion();
  3338. if (!sc) {
  3339. return ZSYSTEMERROR;
  3340. }
  3341. rc = zoo_amulti(zh, count, ops, results, SYNCHRONOUS_MARKER, sc);
  3342. if (rc == ZOK) {
  3343. wait_sync_completion(sc);
  3344. rc = sc->rc;
  3345. }
  3346. free_sync_completion(sc);
  3347. return rc;
  3348. }
  3349. /* specify timeout of 0 to make the function non-blocking */
  3350. /* timeout is in milliseconds */
  3351. int flush_send_queue(zhandle_t*zh, int timeout)
  3352. {
  3353. int rc= ZOK;
  3354. struct timeval started;
  3355. #ifdef WIN32
  3356. fd_set pollSet;
  3357. struct timeval wait;
  3358. #endif
  3359. gettimeofday(&started,0);
  3360. // we can't use dequeue_buffer() here because if (non-blocking) send_buffer()
  3361. // returns EWOULDBLOCK we'd have to put the buffer back on the queue.
  3362. // we use a recursive lock instead and only dequeue the buffer if a send was
  3363. // successful
  3364. lock_buffer_list(&zh->to_send);
  3365. while (zh->to_send.head != 0&& zh->state == ZOO_CONNECTED_STATE) {
  3366. if(timeout!=0){
  3367. int elapsed;
  3368. struct timeval now;
  3369. gettimeofday(&now,0);
  3370. elapsed=calculate_interval(&started,&now);
  3371. if (elapsed>timeout) {
  3372. rc = ZOPERATIONTIMEOUT;
  3373. break;
  3374. }
  3375. #ifdef WIN32
  3376. wait = get_timeval(timeout-elapsed);
  3377. FD_ZERO(&pollSet);
  3378. FD_SET(zh->fd, &pollSet);
  3379. // Poll the socket
  3380. rc = select((int)(zh->fd)+1, NULL, &pollSet, NULL, &wait);
  3381. #else
  3382. struct pollfd fds;
  3383. fds.fd = zh->fd;
  3384. fds.events = POLLOUT;
  3385. fds.revents = 0;
  3386. rc = poll(&fds, 1, timeout-elapsed);
  3387. #endif
  3388. if (rc<=0) {
  3389. /* timed out or an error or POLLERR */
  3390. rc = rc==0 ? ZOPERATIONTIMEOUT : ZSYSTEMERROR;
  3391. break;
  3392. }
  3393. }
  3394. rc = send_buffer(zh->fd, zh->to_send.head);
  3395. if(rc==0 && timeout==0){
  3396. /* send_buffer would block while sending this buffer */
  3397. rc = ZOK;
  3398. break;
  3399. }
  3400. if (rc < 0) {
  3401. rc = ZCONNECTIONLOSS;
  3402. break;
  3403. }
  3404. // if the buffer has been sent successfully, remove it from the queue
  3405. if (rc > 0)
  3406. remove_buffer(&zh->to_send);
  3407. gettimeofday(&zh->last_send, 0);
  3408. rc = ZOK;
  3409. }
  3410. unlock_buffer_list(&zh->to_send);
  3411. return rc;
  3412. }
  3413. const char* zerror(int c)
  3414. {
  3415. switch (c){
  3416. case ZOK:
  3417. return "ok";
  3418. case ZSYSTEMERROR:
  3419. return "system error";
  3420. case ZRUNTIMEINCONSISTENCY:
  3421. return "run time inconsistency";
  3422. case ZDATAINCONSISTENCY:
  3423. return "data inconsistency";
  3424. case ZCONNECTIONLOSS:
  3425. return "connection loss";
  3426. case ZMARSHALLINGERROR:
  3427. return "marshalling error";
  3428. case ZUNIMPLEMENTED:
  3429. return "unimplemented";
  3430. case ZOPERATIONTIMEOUT:
  3431. return "operation timeout";
  3432. case ZBADARGUMENTS:
  3433. return "bad arguments";
  3434. case ZINVALIDSTATE:
  3435. return "invalid zhandle state";
  3436. case ZAPIERROR:
  3437. return "api error";
  3438. case ZNONODE:
  3439. return "no node";
  3440. case ZNOAUTH:
  3441. return "not authenticated";
  3442. case ZBADVERSION:
  3443. return "bad version";
  3444. case ZNOCHILDRENFOREPHEMERALS:
  3445. return "no children for ephemerals";
  3446. case ZNODEEXISTS:
  3447. return "node exists";
  3448. case ZNOTEMPTY:
  3449. return "not empty";
  3450. case ZSESSIONEXPIRED:
  3451. return "session expired";
  3452. case ZINVALIDCALLBACK:
  3453. return "invalid callback";
  3454. case ZINVALIDACL:
  3455. return "invalid acl";
  3456. case ZAUTHFAILED:
  3457. return "authentication failed";
  3458. case ZCLOSING:
  3459. return "zookeeper is closing";
  3460. case ZNOTHING:
  3461. return "(not error) no server responses to process";
  3462. case ZSESSIONMOVED:
  3463. return "session moved to another server, so operation is ignored";
  3464. }
  3465. if (c > 0) {
  3466. return strerror(c);
  3467. }
  3468. return "unknown error";
  3469. }
  3470. int zoo_add_auth(zhandle_t *zh,const char* scheme,const char* cert,
  3471. int certLen,void_completion_t completion, const void *data)
  3472. {
  3473. struct buffer auth;
  3474. auth_info *authinfo;
  3475. if(scheme==NULL || zh==NULL)
  3476. return ZBADARGUMENTS;
  3477. if (is_unrecoverable(zh))
  3478. return ZINVALIDSTATE;
  3479. // [ZOOKEEPER-800] zoo_add_auth should return ZINVALIDSTATE if
  3480. // the connection is closed.
  3481. if (zoo_state(zh) == 0) {
  3482. return ZINVALIDSTATE;
  3483. }
  3484. if(cert!=NULL && certLen!=0){
  3485. auth.buff=calloc(1,certLen);
  3486. if(auth.buff==0) {
  3487. return ZSYSTEMERROR;
  3488. }
  3489. memcpy(auth.buff,cert,certLen);
  3490. auth.len=certLen;
  3491. } else {
  3492. auth.buff = 0;
  3493. auth.len = 0;
  3494. }
  3495. zoo_lock_auth(zh);
  3496. authinfo = (auth_info*) malloc(sizeof(auth_info));
  3497. authinfo->scheme=strdup(scheme);
  3498. authinfo->auth=auth;
  3499. authinfo->completion=completion;
  3500. authinfo->data=data;
  3501. authinfo->next = NULL;
  3502. add_last_auth(&zh->auth_h, authinfo);
  3503. zoo_unlock_auth(zh);
  3504. if(zh->state == ZOO_CONNECTED_STATE || zh->state == ZOO_ASSOCIATING_STATE)
  3505. return send_last_auth_info(zh);
  3506. return ZOK;
  3507. }
  3508. static const char* format_endpoint_info(const struct sockaddr_storage* ep)
  3509. {
  3510. static char buf[128] = { 0 };
  3511. char addrstr[128] = { 0 };
  3512. void *inaddr;
  3513. #ifdef WIN32
  3514. char * addrstring;
  3515. #endif
  3516. int port;
  3517. if(ep==0)
  3518. return "null";
  3519. #if defined(AF_INET6)
  3520. if(ep->ss_family==AF_INET6){
  3521. inaddr=&((struct sockaddr_in6*)ep)->sin6_addr;
  3522. port=((struct sockaddr_in6*)ep)->sin6_port;
  3523. } else {
  3524. #endif
  3525. inaddr=&((struct sockaddr_in*)ep)->sin_addr;
  3526. port=((struct sockaddr_in*)ep)->sin_port;
  3527. #if defined(AF_INET6)
  3528. }
  3529. #endif
  3530. #ifdef WIN32
  3531. addrstring = inet_ntoa (*(struct in_addr*)inaddr);
  3532. sprintf(buf,"%s:%d",addrstring,ntohs(port));
  3533. #else
  3534. inet_ntop(ep->ss_family,inaddr,addrstr,sizeof(addrstr)-1);
  3535. sprintf(buf,"%s:%d",addrstr,ntohs(port));
  3536. #endif
  3537. return buf;
  3538. }
  3539. void zoo_deterministic_conn_order(int yesOrNo)
  3540. {
  3541. disable_conn_permute=yesOrNo;
  3542. }
  3543. /*---------------------------------------------------------------------------*
  3544. * SYNC API
  3545. *---------------------------------------------------------------------------*/
  3546. int zoo_create(zhandle_t *zh, const char *path, const char *value,
  3547. int valuelen, const struct ACL_vector *acl, int flags,
  3548. char *path_buffer, int path_buffer_len)
  3549. {
  3550. struct sync_completion *sc = alloc_sync_completion();
  3551. int rc;
  3552. if (!sc) {
  3553. return ZSYSTEMERROR;
  3554. }
  3555. sc->u.str.str = path_buffer;
  3556. sc->u.str.str_len = path_buffer_len;
  3557. rc=zoo_acreate(zh, path, value, valuelen, acl, flags, SYNCHRONOUS_MARKER, sc);
  3558. if(rc==ZOK){
  3559. wait_sync_completion(sc);
  3560. rc = sc->rc;
  3561. }
  3562. free_sync_completion(sc);
  3563. return rc;
  3564. }
  3565. int zoo_create2(zhandle_t *zh, const char *path, const char *value,
  3566. int valuelen, const struct ACL_vector *acl, int flags,
  3567. char *path_buffer, int path_buffer_len, struct Stat *stat)
  3568. {
  3569. struct sync_completion *sc = alloc_sync_completion();
  3570. int rc;
  3571. if (!sc) {
  3572. return ZSYSTEMERROR;
  3573. }
  3574. sc->u.str.str = path_buffer;
  3575. sc->u.str.str_len = path_buffer_len;
  3576. rc=zoo_acreate2(zh, path, value, valuelen, acl, flags, SYNCHRONOUS_MARKER, sc);
  3577. if(rc==ZOK){
  3578. wait_sync_completion(sc);
  3579. rc = sc->rc;
  3580. if (rc == 0 && stat) {
  3581. *stat = sc->u.stat;
  3582. }
  3583. }
  3584. free_sync_completion(sc);
  3585. return rc;
  3586. }
  3587. int zoo_delete(zhandle_t *zh, const char *path, int version)
  3588. {
  3589. struct sync_completion *sc = alloc_sync_completion();
  3590. int rc;
  3591. if (!sc) {
  3592. return ZSYSTEMERROR;
  3593. }
  3594. rc=zoo_adelete(zh, path, version, SYNCHRONOUS_MARKER, sc);
  3595. if(rc==ZOK){
  3596. wait_sync_completion(sc);
  3597. rc = sc->rc;
  3598. }
  3599. free_sync_completion(sc);
  3600. return rc;
  3601. }
  3602. int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
  3603. {
  3604. return zoo_wexists(zh,path,watch?zh->watcher:0,zh->context,stat);
  3605. }
  3606. int zoo_wexists(zhandle_t *zh, const char *path,
  3607. watcher_fn watcher, void* watcherCtx, struct Stat *stat)
  3608. {
  3609. struct sync_completion *sc = alloc_sync_completion();
  3610. int rc;
  3611. if (!sc) {
  3612. return ZSYSTEMERROR;
  3613. }
  3614. rc=zoo_awexists(zh,path,watcher,watcherCtx,SYNCHRONOUS_MARKER, sc);
  3615. if(rc==ZOK){
  3616. wait_sync_completion(sc);
  3617. rc = sc->rc;
  3618. if (rc == 0&& stat) {
  3619. *stat = sc->u.stat;
  3620. }
  3621. }
  3622. free_sync_completion(sc);
  3623. return rc;
  3624. }
  3625. int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer,
  3626. int* buffer_len, struct Stat *stat)
  3627. {
  3628. return zoo_wget(zh,path,watch?zh->watcher:0,zh->context,
  3629. buffer,buffer_len,stat);
  3630. }
  3631. int zoo_wget(zhandle_t *zh, const char *path,
  3632. watcher_fn watcher, void* watcherCtx,
  3633. char *buffer, int* buffer_len, struct Stat *stat)
  3634. {
  3635. struct sync_completion *sc;
  3636. int rc=0;
  3637. if(buffer_len==NULL)
  3638. return ZBADARGUMENTS;
  3639. if((sc=alloc_sync_completion())==NULL)
  3640. return ZSYSTEMERROR;
  3641. sc->u.data.buffer = buffer;
  3642. sc->u.data.buff_len = *buffer_len;
  3643. rc=zoo_awget(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
  3644. if(rc==ZOK){
  3645. wait_sync_completion(sc);
  3646. rc = sc->rc;
  3647. if (rc == 0) {
  3648. if(stat)
  3649. *stat = sc->u.data.stat;
  3650. *buffer_len = sc->u.data.buff_len;
  3651. }
  3652. }
  3653. free_sync_completion(sc);
  3654. return rc;
  3655. }
  3656. int zoo_set(zhandle_t *zh, const char *path, const char *buffer, int buflen,
  3657. int version)
  3658. {
  3659. return zoo_set2(zh, path, buffer, buflen, version, 0);
  3660. }
  3661. int zoo_set2(zhandle_t *zh, const char *path, const char *buffer, int buflen,
  3662. int version, struct Stat *stat)
  3663. {
  3664. struct sync_completion *sc = alloc_sync_completion();
  3665. int rc;
  3666. if (!sc) {
  3667. return ZSYSTEMERROR;
  3668. }
  3669. rc=zoo_aset(zh, path, buffer, buflen, version, SYNCHRONOUS_MARKER, sc);
  3670. if(rc==ZOK){
  3671. wait_sync_completion(sc);
  3672. rc = sc->rc;
  3673. if (rc == 0 && stat) {
  3674. *stat = sc->u.stat;
  3675. }
  3676. }
  3677. free_sync_completion(sc);
  3678. return rc;
  3679. }
  3680. static int zoo_wget_children_(zhandle_t *zh, const char *path,
  3681. watcher_fn watcher, void* watcherCtx,
  3682. struct String_vector *strings)
  3683. {
  3684. struct sync_completion *sc = alloc_sync_completion();
  3685. int rc;
  3686. if (!sc) {
  3687. return ZSYSTEMERROR;
  3688. }
  3689. rc= zoo_awget_children (zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
  3690. if(rc==ZOK){
  3691. wait_sync_completion(sc);
  3692. rc = sc->rc;
  3693. if (rc == 0) {
  3694. if (strings) {
  3695. *strings = sc->u.strs2;
  3696. } else {
  3697. deallocate_String_vector(&sc->u.strs2);
  3698. }
  3699. }
  3700. }
  3701. free_sync_completion(sc);
  3702. return rc;
  3703. }
  3704. static int zoo_wget_children2_(zhandle_t *zh, const char *path,
  3705. watcher_fn watcher, void* watcherCtx,
  3706. struct String_vector *strings, struct Stat *stat)
  3707. {
  3708. struct sync_completion *sc = alloc_sync_completion();
  3709. int rc;
  3710. if (!sc) {
  3711. return ZSYSTEMERROR;
  3712. }
  3713. rc= zoo_awget_children2(zh, path, watcher, watcherCtx, SYNCHRONOUS_MARKER, sc);
  3714. if(rc==ZOK){
  3715. wait_sync_completion(sc);
  3716. rc = sc->rc;
  3717. if (rc == 0) {
  3718. *stat = sc->u.strs_stat.stat2;
  3719. if (strings) {
  3720. *strings = sc->u.strs_stat.strs2;
  3721. } else {
  3722. deallocate_String_vector(&sc->u.strs_stat.strs2);
  3723. }
  3724. }
  3725. }
  3726. free_sync_completion(sc);
  3727. return rc;
  3728. }
  3729. int zoo_get_children(zhandle_t *zh, const char *path, int watch,
  3730. struct String_vector *strings)
  3731. {
  3732. return zoo_wget_children_(zh,path,watch?zh->watcher:0,zh->context,strings);
  3733. }
  3734. int zoo_wget_children(zhandle_t *zh, const char *path,
  3735. watcher_fn watcher, void* watcherCtx,
  3736. struct String_vector *strings)
  3737. {
  3738. return zoo_wget_children_(zh,path,watcher,watcherCtx,strings);
  3739. }
  3740. int zoo_get_children2(zhandle_t *zh, const char *path, int watch,
  3741. struct String_vector *strings, struct Stat *stat)
  3742. {
  3743. return zoo_wget_children2_(zh,path,watch?zh->watcher:0,zh->context,strings,stat);
  3744. }
  3745. int zoo_wget_children2(zhandle_t *zh, const char *path,
  3746. watcher_fn watcher, void* watcherCtx,
  3747. struct String_vector *strings, struct Stat *stat)
  3748. {
  3749. return zoo_wget_children2_(zh,path,watcher,watcherCtx,strings,stat);
  3750. }
  3751. int zoo_get_acl(zhandle_t *zh, const char *path, struct ACL_vector *acl,
  3752. struct Stat *stat)
  3753. {
  3754. struct sync_completion *sc = alloc_sync_completion();
  3755. int rc;
  3756. if (!sc) {
  3757. return ZSYSTEMERROR;
  3758. }
  3759. rc=zoo_aget_acl(zh, path, SYNCHRONOUS_MARKER, sc);
  3760. if(rc==ZOK){
  3761. wait_sync_completion(sc);
  3762. rc = sc->rc;
  3763. if (rc == 0&& stat) {
  3764. *stat = sc->u.acl.stat;
  3765. }
  3766. if (rc == 0) {
  3767. if (acl) {
  3768. *acl = sc->u.acl.acl;
  3769. } else {
  3770. deallocate_ACL_vector(&sc->u.acl.acl);
  3771. }
  3772. }
  3773. }
  3774. free_sync_completion(sc);
  3775. return rc;
  3776. }
  3777. int zoo_set_acl(zhandle_t *zh, const char *path, int version,
  3778. const struct ACL_vector *acl)
  3779. {
  3780. struct sync_completion *sc = alloc_sync_completion();
  3781. int rc;
  3782. if (!sc) {
  3783. return ZSYSTEMERROR;
  3784. }
  3785. rc=zoo_aset_acl(zh, path, version, (struct ACL_vector*)acl,
  3786. SYNCHRONOUS_MARKER, sc);
  3787. if(rc==ZOK){
  3788. wait_sync_completion(sc);
  3789. rc = sc->rc;
  3790. }
  3791. free_sync_completion(sc);
  3792. return rc;
  3793. }