TestOperations.cc 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <cppunit/extensions/HelperMacros.h>
  19. #include "CppAssertHelper.h"
  20. #include "ZKMocks.h"
  21. #include <proto.h>
  22. using namespace std;
  23. class Zookeeper_operations : public CPPUNIT_NS::TestFixture
  24. {
  25. CPPUNIT_TEST_SUITE(Zookeeper_operations);
  26. #ifndef THREADED
  27. CPPUNIT_TEST(testPing);
  28. CPPUNIT_TEST(testUnsolicitedPing);
  29. CPPUNIT_TEST(testTimeoutCausedByWatches1);
  30. CPPUNIT_TEST(testTimeoutCausedByWatches2);
  31. CPPUNIT_TEST(testCloseWhileInProgressFromMain);
  32. CPPUNIT_TEST(testCloseWhileInProgressFromCompletion);
  33. CPPUNIT_TEST(testCloseWhileMultiInProgressFromMain);
  34. CPPUNIT_TEST(testCloseWhileMultiInProgressFromCompletion);
  35. CPPUNIT_TEST(testConnectResponseFull);
  36. CPPUNIT_TEST(testConnectResponseNoReadOnlyFlag);
  37. CPPUNIT_TEST(testConnectResponseSplitAtReadOnlyFlag);
  38. CPPUNIT_TEST(testConnectResponseNoReadOnlyFlagSplit);
  39. #else
  40. CPPUNIT_TEST(testAsyncWatcher1);
  41. CPPUNIT_TEST(testAsyncGetOperation);
  42. #endif
  43. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently1);
  44. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently2);
  45. CPPUNIT_TEST(testConcurrentOperations1);
  46. CPPUNIT_TEST_SUITE_END();
  47. zhandle_t *zh;
  48. FILE *logfile;
  49. static void watcher(zhandle_t *, int, int, const char *,void*){}
  50. public:
  51. Zookeeper_operations() {
  52. logfile = openlogfile("Zookeeper_operations");
  53. }
  54. ~Zookeeper_operations() {
  55. if (logfile) {
  56. fflush(logfile);
  57. fclose(logfile);
  58. logfile = 0;
  59. }
  60. }
  61. void setUp()
  62. {
  63. zoo_set_log_stream(logfile);
  64. zoo_deterministic_conn_order(0);
  65. zh=0;
  66. }
  67. void tearDown()
  68. {
  69. zookeeper_close(zh);
  70. }
  71. class AsyncGetOperationCompletion: public AsyncCompletion{
  72. public:
  73. AsyncGetOperationCompletion():called_(false),rc_(ZAPIERROR){}
  74. virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){
  75. synchronized(mx_);
  76. called_=true;
  77. rc_=rc;
  78. value_.erase();
  79. if(rc!=ZOK) return;
  80. value_.assign(value,len);
  81. if(stat)
  82. stat_=*stat;
  83. }
  84. bool operator()()const{
  85. synchronized(mx_);
  86. return called_;
  87. }
  88. mutable Mutex mx_;
  89. bool called_;
  90. int rc_;
  91. string value_;
  92. NodeStat stat_;
  93. };
  94. class AsyncVoidOperationCompletion: public AsyncCompletion{
  95. public:
  96. AsyncVoidOperationCompletion():called_(false),rc_(ZAPIERROR){}
  97. virtual void voidCompl(int rc){
  98. synchronized(mx_);
  99. called_=true;
  100. rc_=rc;
  101. }
  102. bool operator()()const{
  103. synchronized(mx_);
  104. return called_;
  105. }
  106. mutable Mutex mx_;
  107. bool called_;
  108. int rc_;
  109. };
  110. #ifndef THREADED
  111. // send two get data requests; verify that the corresponding completions called
  112. void testConcurrentOperations1()
  113. {
  114. Mock_gettimeofday timeMock;
  115. ZookeeperServer zkServer;
  116. // must call zookeeper_close() while all the mocks are in scope
  117. CloseFinally guard(&zh);
  118. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  119. CPPUNIT_ASSERT(zh!=0);
  120. // simulate connected state
  121. forceConnected(zh, &timeMock.tv);
  122. int fd=0;
  123. int interest=0;
  124. timeval tv;
  125. // first operation
  126. AsyncGetOperationCompletion res1;
  127. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  128. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  129. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  130. // second operation
  131. AsyncGetOperationCompletion res2;
  132. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  133. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  134. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  135. // process the send queue
  136. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  137. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  138. while((rc=zookeeper_process(zh,interest))==ZOK) {
  139. millisleep(100);
  140. //printf("%d\n", rc);
  141. }
  142. //printf("RC = %d", rc);
  143. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  144. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  145. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  146. CPPUNIT_ASSERT_EQUAL((int)ZOK,res2.rc_);
  147. CPPUNIT_ASSERT_EQUAL(string("2"),res2.value_);
  148. }
  149. // send two getData requests and disconnect while the second request is
  150. // outstanding;
  151. // verify the completions are called
  152. void testOperationsAndDisconnectConcurrently1()
  153. {
  154. Mock_gettimeofday timeMock;
  155. ZookeeperServer zkServer;
  156. // must call zookeeper_close() while all the mocks are in scope
  157. CloseFinally guard(&zh);
  158. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  159. CPPUNIT_ASSERT(zh!=0);
  160. // simulate connected state
  161. forceConnected(zh, &timeMock.tv);
  162. int fd=0;
  163. int interest=0;
  164. timeval tv;
  165. // first operation
  166. AsyncGetOperationCompletion res1;
  167. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  168. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  169. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  170. // second operation
  171. AsyncGetOperationCompletion res2;
  172. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  173. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  174. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  175. // process the send queue
  176. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  177. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  178. rc=zookeeper_process(zh,interest);
  179. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  180. // simulate a disconnect
  181. zkServer.setConnectionLost();
  182. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  183. rc=zookeeper_process(zh,interest);
  184. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
  185. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  186. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  187. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,res2.rc_);
  188. CPPUNIT_ASSERT_EQUAL(string(""),res2.value_);
  189. }
  190. // send two getData requests and simulate timeout while the both request
  191. // are pending;
  192. // verify the completions are called
  193. void testOperationsAndDisconnectConcurrently2()
  194. {
  195. Mock_gettimeofday timeMock;
  196. ZookeeperServer zkServer;
  197. // must call zookeeper_close() while all the mocks are in scope
  198. CloseFinally guard(&zh);
  199. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  200. CPPUNIT_ASSERT(zh!=0);
  201. // simulate connected state
  202. forceConnected(zh, &timeMock.tv);
  203. int fd=0;
  204. int interest=0;
  205. timeval tv;
  206. // first operation
  207. AsyncGetOperationCompletion res1;
  208. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  209. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  210. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  211. // second operation
  212. AsyncGetOperationCompletion res2;
  213. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  214. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  215. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  216. // simulate timeout
  217. timeMock.tick(+10); // advance system time by 10 secs
  218. // the next call to zookeeper_interest should return ZOPERATIONTIMEOUT
  219. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  220. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,rc);
  221. // make sure the completions have been called
  222. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res1.rc_);
  223. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res2.rc_);
  224. }
  225. class PingCountingServer: public ZookeeperServer{
  226. public:
  227. PingCountingServer():pingCount_(0){}
  228. // called when a client request is received
  229. virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
  230. if(rh.type==ZOO_PING_OP){
  231. pingCount_++;
  232. }
  233. }
  234. int pingCount_;
  235. };
  236. // establish a connection; idle for a while
  237. // verify ping was sent at least once
  238. void testPing()
  239. {
  240. const int TIMEOUT=9; // timeout in secs
  241. Mock_gettimeofday timeMock;
  242. PingCountingServer zkServer;
  243. // must call zookeeper_close() while all the mocks are in scope
  244. CloseFinally guard(&zh);
  245. // receive timeout is in milliseconds
  246. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  247. CPPUNIT_ASSERT(zh!=0);
  248. // simulate connected state
  249. forceConnected(zh, &timeMock.tv);
  250. int fd=0;
  251. int interest=0;
  252. timeval tv;
  253. // Round 1.
  254. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  255. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  256. // simulate waiting for the select() call to timeout;
  257. // advance the system clock accordingly
  258. timeMock.tick(tv);
  259. rc=zookeeper_process(zh,interest);
  260. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  261. // verify no ping sent
  262. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  263. // Round 2.
  264. // the client should have the idle threshold exceeded, by now
  265. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  266. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  267. // assume the socket is writable, so no idling here; move on to
  268. // zookeeper_process immediately
  269. rc=zookeeper_process(zh,interest);
  270. // ZNOTHING means the client hasn't received a ping response yet
  271. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  272. // verify a ping is sent
  273. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  274. // Round 3.
  275. // we're going to receive a server PING response and make sure
  276. // that the client has updated its last_recv timestamp
  277. zkServer.addRecvResponse(new PingResponse);
  278. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  279. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  280. // pseudo-sleep for a short while (10 ms)
  281. timeMock.millitick(10);
  282. rc=zookeeper_process(zh,interest);
  283. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  284. // only one ping so far?
  285. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  286. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  287. // Round 4
  288. // make sure that a ping is not sent if something is outstanding
  289. AsyncGetOperationCompletion res1;
  290. rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  291. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  292. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  293. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  294. timeMock.tick(tv);
  295. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  296. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  297. rc=zookeeper_process(zh,interest);
  298. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  299. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  300. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  301. // pseudo-sleep for a short while (10 ms)
  302. timeMock.millitick(10);
  303. rc=zookeeper_process(zh,interest);
  304. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  305. // only one ping so far?
  306. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  307. }
  308. // ZOOKEEPER-2253: Permit unsolicited pings
  309. void testUnsolicitedPing()
  310. {
  311. const int TIMEOUT=9; // timeout in secs
  312. Mock_gettimeofday timeMock;
  313. PingCountingServer zkServer;
  314. // must call zookeeper_close() while all the mocks are in scope
  315. CloseFinally guard(&zh);
  316. // receive timeout is in milliseconds
  317. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  318. CPPUNIT_ASSERT(zh!=0);
  319. // simulate connected state
  320. forceConnected(zh, &timeMock.tv);
  321. int fd=0;
  322. int interest=0;
  323. timeval tv;
  324. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  325. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  326. // verify no ping sent
  327. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  328. // we're going to receive a unsolicited PING response; ensure
  329. // that the client has updated its last_recv timestamp
  330. timeMock.tick(tv);
  331. zkServer.addRecvResponse(new PingResponse);
  332. rc=zookeeper_process(zh,interest);
  333. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  334. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  335. }
  336. // simulate a watch arriving right before a ping is due
  337. // assert the ping is sent nevertheless
  338. void testTimeoutCausedByWatches1()
  339. {
  340. const int TIMEOUT=9; // timeout in secs
  341. Mock_gettimeofday timeMock;
  342. PingCountingServer zkServer;
  343. // must call zookeeper_close() while all the mocks are in scope
  344. CloseFinally guard(&zh);
  345. // receive timeout is in milliseconds
  346. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  347. CPPUNIT_ASSERT(zh!=0);
  348. // simulate connected state
  349. forceConnected(zh, &timeMock.tv);
  350. int fd=0;
  351. int interest=0;
  352. timeval tv;
  353. // Round 1.
  354. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  355. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  356. // simulate waiting for the select() call to timeout;
  357. // advance the system clock accordingly
  358. timeMock.tick(tv);
  359. timeMock.tick(-1); // set the clock to a millisecond before a ping is due
  360. // trigger a watch now
  361. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  362. rc=zookeeper_process(zh,interest);
  363. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  364. // arrival of a watch sets the last_recv to the current time
  365. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  366. // spend 1 millisecond by processing the watch
  367. timeMock.tick(1);
  368. // Round 2.
  369. // a ping is due; zookeeper_interest() must send it now
  370. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  371. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  372. // no delay here -- as if the socket is immediately writable
  373. rc=zookeeper_process(zh,interest);
  374. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  375. // verify a ping is sent
  376. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  377. }
  378. // similar to testTimeoutCausedByWatches1, but this time the watch is
  379. // triggered while the client has an outstanding request
  380. // assert the ping is sent on time
  381. void testTimeoutCausedByWatches2()
  382. {
  383. const int TIMEOUT=9; // timeout in secs
  384. Mock_gettimeofday now;
  385. PingCountingServer zkServer;
  386. // must call zookeeper_close() while all the mocks are in scope
  387. CloseFinally guard(&zh);
  388. // receive timeout is in milliseconds
  389. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  390. CPPUNIT_ASSERT(zh!=0);
  391. // simulate connected state
  392. forceConnected(zh, &now.tv);
  393. // queue up a request; keep it pending (as if the server is busy or has died)
  394. AsyncGetOperationCompletion res1;
  395. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  396. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  397. int fd=0;
  398. int interest=0;
  399. timeval tv;
  400. // Round 1.
  401. // send the queued up zoo_aget() request
  402. Mock_gettimeofday beginningOfTimes(now); // remember when we started
  403. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  404. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  405. // no delay -- the socket is writable
  406. rc=zookeeper_process(zh,interest);
  407. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  408. // Round 2.
  409. // what's next?
  410. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  411. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  412. // no response from the server yet -- waiting in the select() call
  413. now.tick(tv);
  414. // a watch has arrived, thus preventing the connection from timing out
  415. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  416. rc=zookeeper_process(zh,interest);
  417. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); // read the watch message
  418. CPPUNIT_ASSERT_EQUAL(0,zkServer.pingCount_); // not yet!
  419. //Round 3.
  420. // now is the time to send a ping; make sure it's actually sent
  421. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  422. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  423. rc=zookeeper_process(zh,interest);
  424. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  425. // verify a ping is sent
  426. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  427. // make sure only 1/3 of the timeout has passed
  428. CPPUNIT_ASSERT_EQUAL((int32_t)TIMEOUT/3*1000,toMilliseconds(now-beginningOfTimes));
  429. }
  430. // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
  431. // while there is a request waiting for being processed
  432. // call zookeeper_close() from the main event loop
  433. // assert the completion callback is called
  434. void testCloseWhileInProgressFromMain()
  435. {
  436. Mock_gettimeofday timeMock;
  437. ZookeeperServer zkServer;
  438. CloseFinally guard(&zh);
  439. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  440. CPPUNIT_ASSERT(zh!=0);
  441. forceConnected(zh, &timeMock.tv);
  442. zhandle_t* savezh=zh;
  443. // issue a request
  444. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  445. AsyncGetOperationCompletion res1;
  446. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  447. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  448. // but do not allow Zookeeper C Client to process the request
  449. // and call zookeeper_close() from the main event loop immediately
  450. Mock_free_noop freeMock;
  451. rc=zookeeper_close(zh); zh=0;
  452. freeMock.disable();
  453. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  454. // verify that memory for completions was freed (would be freed if no mock installed)
  455. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  456. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  457. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  458. // verify that completion was called, and it was called with ZCLOSING status
  459. CPPUNIT_ASSERT(res1.called_);
  460. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
  461. }
  462. // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
  463. // send some request #1
  464. // then, while there is a request #2 waiting for being processed
  465. // call zookeeper_close() from the completion callback of request #1
  466. // assert the completion callback #2 is called
  467. void testCloseWhileInProgressFromCompletion()
  468. {
  469. Mock_gettimeofday timeMock;
  470. ZookeeperServer zkServer;
  471. CloseFinally guard(&zh);
  472. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  473. CPPUNIT_ASSERT(zh!=0);
  474. forceConnected(zh, &timeMock.tv);
  475. zhandle_t* savezh=zh;
  476. // will handle completion on request #1 and issue request #2 from it
  477. class AsyncGetOperationCompletion1: public AsyncCompletion{
  478. public:
  479. AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
  480. AsyncGetOperationCompletion *res2)
  481. :zh_(zh),zkServer_(zkServer),res2_(res2){}
  482. virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
  483. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);
  484. // from the completion #1 handler, issue request #2
  485. zkServer_->addOperationResponse(new ZooGetResponse("2",1));
  486. int rc2=zoo_aget(*zh_,"/x/y/2",0,asyncCompletion,res2_);
  487. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  488. // but do not allow Zookeeper C Client to process the request #2
  489. // and call zookeeper_close() from the completion callback of request #1
  490. rc2=zookeeper_close(*zh_); *zh_=0;
  491. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  492. // do not disable freeMock here, let completion #2 handler
  493. // return through ZooKeeper C Client internals to the main loop
  494. // and fulfill the work
  495. }
  496. zhandle_t **zh_;
  497. ZookeeperServer *zkServer_;
  498. AsyncGetOperationCompletion *res2_;
  499. };
  500. // issue request #1
  501. AsyncGetOperationCompletion res2;
  502. AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2);
  503. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  504. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  505. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  506. // process the send queue
  507. int fd; int interest; timeval tv;
  508. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  509. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  510. CPPUNIT_ASSERT(zh!=0);
  511. Mock_free_noop freeMock;
  512. while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
  513. millisleep(100);
  514. }
  515. freeMock.disable();
  516. CPPUNIT_ASSERT(zh==0);
  517. // verify that memory for completions was freed (would be freed if no mock installed)
  518. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  519. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  520. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  521. // verify that completion #2 was called, and it was called with ZCLOSING status
  522. CPPUNIT_ASSERT(res2.called_);
  523. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
  524. }
  525. // ZOOKEEPER-2891: Invalid processing of zookeeper_close for multi-request
  526. // while there is a multi request waiting for being processed
  527. // call zookeeper_close() from the main event loop
  528. // assert the completion callback is called with status ZCLOSING
  529. void testCloseWhileMultiInProgressFromMain()
  530. {
  531. Mock_gettimeofday timeMock;
  532. ZookeeperServer zkServer;
  533. CloseFinally guard(&zh);
  534. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  535. CPPUNIT_ASSERT(zh!=0);
  536. forceConnected(zh, &timeMock.tv);
  537. zhandle_t* savezh=zh;
  538. // issue a multi request
  539. int nops=2;
  540. zoo_op_t ops[nops];
  541. zoo_op_result_t results[nops];
  542. zoo_create_op_init(&ops[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  543. zoo_create_op_init(&ops[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  544. // TODO: Provide ZooMultiResponse. However, it's not required in this test.
  545. // zkServer.addOperationResponse(new ZooMultiResponse(...));
  546. AsyncVoidOperationCompletion res1;
  547. int rc=zoo_amulti(zh,nops,ops,results,asyncCompletion,&res1);
  548. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  549. // but do not allow Zookeeper C Client to process the request
  550. // and call zookeeper_close() from the main event loop immediately
  551. Mock_free_noop freeMock;
  552. rc=zookeeper_close(zh); zh=0;
  553. freeMock.disable();
  554. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  555. // verify that memory for completions was freed (would be freed if no mock installed)
  556. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  557. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  558. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  559. // verify that completion was called, and it was called with ZCLOSING status
  560. CPPUNIT_ASSERT(res1.called_);
  561. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
  562. }
  563. // ZOOKEEPER-2891: Invalid processing of zookeeper_close for multi-request
  564. // send some request #1 (not a multi request)
  565. // then, while there is a multi request #2 waiting for being processed
  566. // call zookeeper_close() from the completion callback of request #1
  567. // assert the completion callback #2 is called with status ZCLOSING
  568. void testCloseWhileMultiInProgressFromCompletion()
  569. {
  570. Mock_gettimeofday timeMock;
  571. ZookeeperServer zkServer;
  572. CloseFinally guard(&zh);
  573. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  574. CPPUNIT_ASSERT(zh!=0);
  575. forceConnected(zh, &timeMock.tv);
  576. zhandle_t* savezh=zh;
  577. // these shall persist during the test
  578. int nops=2;
  579. zoo_op_t ops[nops];
  580. zoo_op_result_t results[nops];
  581. // will handle completion on request #1 and issue request #2 from it
  582. class AsyncGetOperationCompletion1: public AsyncCompletion{
  583. public:
  584. AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
  585. AsyncVoidOperationCompletion *res2,
  586. int nops, zoo_op_t* ops, zoo_op_result_t* results)
  587. :zh_(zh),zkServer_(zkServer),res2_(res2),nops_(nops),ops_(ops),results_(results){}
  588. virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
  589. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);
  590. // from the completion #1 handler, issue multi request #2
  591. assert(nops_>=2);
  592. zoo_create_op_init(&ops_[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  593. zoo_create_op_init(&ops_[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  594. // TODO: Provide ZooMultiResponse. However, it's not required in this test.
  595. // zkServer_->addOperationResponse(new ZooMultiResponse(...));
  596. int rc2=zoo_amulti(*zh_,nops_,ops_,results_,asyncCompletion,res2_);
  597. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  598. // but do not allow Zookeeper C Client to process the request #2
  599. // and call zookeeper_close() from the completion callback of request #1
  600. rc2=zookeeper_close(*zh_); *zh_=0;
  601. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  602. // do not disable freeMock here, let completion #2 handler
  603. // return through ZooKeeper C Client internals to the main loop
  604. // and fulfill the work
  605. }
  606. zhandle_t **zh_;
  607. ZookeeperServer *zkServer_;
  608. AsyncVoidOperationCompletion *res2_;
  609. int nops_;
  610. zoo_op_t* ops_;
  611. zoo_op_result_t* results_;
  612. };
  613. // issue some request #1 (not a multi request)
  614. AsyncVoidOperationCompletion res2;
  615. AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2,nops,ops,results);
  616. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  617. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  618. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  619. // process the send queue
  620. int fd; int interest; timeval tv;
  621. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  622. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  623. CPPUNIT_ASSERT(zh!=0);
  624. Mock_free_noop freeMock;
  625. while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
  626. millisleep(100);
  627. }
  628. freeMock.disable();
  629. CPPUNIT_ASSERT(zh==0);
  630. // verify that memory for completions was freed (would be freed if no mock installed)
  631. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  632. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  633. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  634. // verify that completion #2 was called, and it was called with ZCLOSING status
  635. CPPUNIT_ASSERT(res2.called_);
  636. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
  637. }
  638. void testConnectResponseFull()
  639. {
  640. CloseFinally guard(&zh);
  641. Mock_socket socketMock;
  642. HandshakeResponse hsResponse;
  643. std::string hsResponseData = hsResponse.toString();
  644. CPPUNIT_ASSERT_EQUAL(hsResponseData.length(), static_cast<size_t>(41));
  645. zh = zookeeper_init("localhost:2121", watcher, 10000, NULL, NULL, 0);
  646. CPPUNIT_ASSERT(zh!=0);
  647. int rc, fd, interest;
  648. timeval tv;
  649. rc = zookeeper_interest(zh, &fd, &interest, &tv);
  650. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  651. socketMock.recvReturnBuffer = hsResponseData;
  652. rc = zookeeper_process(zh, interest);
  653. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  654. CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTED_STATE, static_cast<int>(zh->state));
  655. }
  656. void testConnectResponseNoReadOnlyFlag()
  657. {
  658. CloseFinally guard(&zh);
  659. Mock_socket socketMock;
  660. HandshakeResponse hsResponse;
  661. hsResponse.omitReadOnly = true;
  662. std::string hsResponseData = hsResponse.toString();
  663. CPPUNIT_ASSERT_EQUAL(hsResponseData.length(), static_cast<size_t>(40));
  664. zh = zookeeper_init("localhost:2121", watcher, 10000, NULL, NULL, 0);
  665. CPPUNIT_ASSERT(zh!=0);
  666. int rc, fd, interest;
  667. timeval tv;
  668. rc = zookeeper_interest(zh, &fd, &interest, &tv);
  669. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  670. socketMock.recvReturnBuffer = hsResponseData;
  671. rc = zookeeper_process(zh, interest);
  672. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  673. CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTED_STATE, static_cast<int>(zh->state));
  674. }
  675. void testConnectResponseSplitAtReadOnlyFlag()
  676. {
  677. CloseFinally guard(&zh);
  678. Mock_socket socketMock;
  679. HandshakeResponse hsResponse;
  680. std::string hsResponseData = hsResponse.toString();
  681. CPPUNIT_ASSERT_EQUAL(hsResponseData.length(), static_cast<size_t>(41));
  682. zh = zookeeper_init("localhost:2121", watcher, 10000, NULL, NULL, 0);
  683. CPPUNIT_ASSERT(zh!=0);
  684. int rc, fd, interest;
  685. timeval tv;
  686. rc = zookeeper_interest(zh, &fd, &interest, &tv);
  687. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  688. socketMock.recvReturnBuffer = hsResponseData.substr(0, 40);
  689. rc = zookeeper_process(zh, interest);
  690. // Response not complete.
  691. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZNOTHING), rc);
  692. CPPUNIT_ASSERT_EQUAL(ZOO_ASSOCIATING_STATE, static_cast<int>(zh->state));
  693. socketMock.recvReturnBuffer = hsResponseData.substr(40);
  694. rc = zookeeper_process(zh, interest);
  695. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  696. CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTED_STATE, static_cast<int>(zh->state));
  697. }
  698. void testConnectResponseNoReadOnlyFlagSplit()
  699. {
  700. CloseFinally guard(&zh);
  701. Mock_socket socketMock;
  702. HandshakeResponse hsResponse;
  703. hsResponse.omitReadOnly = true;
  704. std::string hsResponseData = hsResponse.toString();
  705. CPPUNIT_ASSERT_EQUAL(hsResponseData.length(), static_cast<size_t>(40));
  706. zh = zookeeper_init("localhost:2121", watcher, 10000, NULL, NULL, 0);
  707. CPPUNIT_ASSERT(zh!=0);
  708. int rc, fd, interest;
  709. timeval tv;
  710. rc = zookeeper_interest(zh, &fd, &interest, &tv);
  711. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  712. socketMock.recvReturnBuffer = hsResponseData.substr(0, 20);
  713. rc = zookeeper_process(zh, interest);
  714. // Response not complete.
  715. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZNOTHING), rc);
  716. CPPUNIT_ASSERT_EQUAL(ZOO_ASSOCIATING_STATE, static_cast<int>(zh->state));
  717. socketMock.recvReturnBuffer = hsResponseData.substr(20);
  718. rc = zookeeper_process(zh, interest);
  719. CPPUNIT_ASSERT_EQUAL(static_cast<int>(ZOK), rc);
  720. CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTED_STATE, static_cast<int>(zh->state));
  721. }
  722. #else
  723. class TestGetDataJob: public TestJob{
  724. public:
  725. TestGetDataJob(ZookeeperServer* svr,zhandle_t* zh, int reps=500)
  726. :svr_(svr),zh_(zh),rc_(ZAPIERROR),reps_(reps){}
  727. virtual void run(){
  728. int i;
  729. for(i=0;i<reps_;i++){
  730. char buf;
  731. int size=sizeof(buf);
  732. if (i % 10 == 0) {
  733. // We need to pause every once in a while so we don't
  734. // get too far ahead and finish before the disconnect
  735. millisleep(1);
  736. }
  737. svr_->addOperationResponse(new ZooGetResponse("1",1));
  738. rc_=zoo_get(zh_,"/x/y/z",0,&buf,&size,0);
  739. if(rc_!=ZOK){
  740. break;
  741. }
  742. }
  743. }
  744. ZookeeperServer* svr_;
  745. zhandle_t* zh_;
  746. int rc_;
  747. int reps_;
  748. };
  749. class TestConcurrentOpJob: public TestGetDataJob{
  750. public:
  751. static const int REPS=500;
  752. TestConcurrentOpJob(ZookeeperServer* svr,zhandle_t* zh):
  753. TestGetDataJob(svr,zh,REPS){}
  754. virtual TestJob* clone() const {
  755. return new TestConcurrentOpJob(svr_,zh_);
  756. }
  757. virtual void validate(const char* file, int line) const{
  758. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  759. }
  760. };
  761. void testConcurrentOperations1()
  762. {
  763. for(int counter=0; counter<50; counter++){
  764. // frozen time -- no timeouts and no pings
  765. Mock_gettimeofday timeMock;
  766. ZookeeperServer zkServer;
  767. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  768. // must call zookeeper_close() while all the mocks are in the scope!
  769. CloseFinally guard(&zh);
  770. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  771. CPPUNIT_ASSERT(zh!=0);
  772. // make sure the client has connected
  773. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  774. TestJobManager jmgr(TestConcurrentOpJob(&zkServer,zh),10);
  775. jmgr.startAllJobs();
  776. jmgr.wait();
  777. // validate test results
  778. VALIDATE_JOBS(jmgr);
  779. }
  780. }
  781. class ZKGetJob: public TestJob{
  782. public:
  783. static const int REPS=1000;
  784. ZKGetJob(zhandle_t* zh)
  785. :zh_(zh),rc_(ZAPIERROR){}
  786. virtual TestJob* clone() const {
  787. return new ZKGetJob(zh_);
  788. }
  789. virtual void run(){
  790. int i;
  791. for(i=0;i<REPS;i++){
  792. char buf;
  793. int size=sizeof(buf);
  794. rc_=zoo_get(zh_,"/xyz",0,&buf,&size,0);
  795. if(rc_!=ZOK){
  796. break;
  797. }
  798. }
  799. //TEST_TRACE("Finished %d iterations",i);
  800. }
  801. virtual void validate(const char* file, int line) const{
  802. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  803. }
  804. zhandle_t* zh_;
  805. int rc_;
  806. };
  807. // this test connects to a real ZK server and creates the /xyz node and sends
  808. // lots of zoo_get requests.
  809. // to run this test use the following command:
  810. // zktest-mt Zookeeper_operations::testOperationsAndDisconnectConcurrently2 localhost:3181
  811. // where the second parameter is the server host and port
  812. void testOperationsAndDisconnectConcurrently2()
  813. {
  814. if(globalTestConfig.getTestName().find(__func__)==string::npos ||
  815. globalTestConfig.getExtraOptCount()==0)
  816. {
  817. // only run this test when specifically asked so
  818. return;
  819. }
  820. string host(*(globalTestConfig.getExtraOptBegin()));
  821. zhandle_t* lzh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  822. CPPUNIT_ASSERT(lzh!=0);
  823. // make sure the client has connected
  824. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  825. ensureCondition(ClientConnected(zh),5000)<5000);
  826. char realpath[1024];
  827. int rc=zoo_create(lzh,"/xyz","1",1,&ZOO_OPEN_ACL_UNSAFE,0,realpath,sizeof(realpath)-1);
  828. CPPUNIT_ASSERT(rc==ZOK || rc==ZNODEEXISTS);
  829. zookeeper_close(lzh);
  830. for(int counter=0; counter<200; counter++){
  831. TEST_TRACE("Loop count %d",counter);
  832. CloseFinally guard(&zh);
  833. zh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  834. CPPUNIT_ASSERT(zh!=0);
  835. // make sure the client has connected
  836. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  837. ensureCondition(ClientConnected(zh),5000)<5000);
  838. TestJobManager jmgr(ZKGetJob(zh),10);
  839. jmgr.startJobsImmediately();
  840. jmgr.wait();
  841. VALIDATE_JOBS(jmgr);
  842. TEST_TRACE("run %d finished",counter);
  843. }
  844. }
  845. class TestConcurrentOpWithDisconnectJob: public TestGetDataJob{
  846. public:
  847. static const int REPS=1000;
  848. TestConcurrentOpWithDisconnectJob(ZookeeperServer* svr,zhandle_t* zh):
  849. TestGetDataJob(svr,zh,REPS){}
  850. virtual TestJob* clone() const {
  851. return new TestConcurrentOpWithDisconnectJob(svr_,zh_);
  852. }
  853. virtual void validate(const char* file, int line) const{
  854. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZCONNECTIONLOSS != rc",(int)ZCONNECTIONLOSS,rc_,file,line);
  855. }
  856. };
  857. // this test is not 100% accurate in a sense it may not detect all error cases.
  858. // TODO: I can't think of a test that is 100% accurate and doesn't interfere
  859. // with the code being tested (in terms of introducing additional
  860. // implicit synchronization points)
  861. void testOperationsAndDisconnectConcurrently1()
  862. {
  863. for(int counter=0; counter<50; counter++){
  864. //TEST_TRACE("Loop count %d",counter);
  865. // frozen time -- no timeouts and no pings
  866. Mock_gettimeofday timeMock;
  867. ZookeeperServer zkServer;
  868. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  869. // must call zookeeper_close() while all the mocks are in the scope!
  870. CloseFinally guard(&zh);
  871. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  872. CPPUNIT_ASSERT(zh!=0);
  873. // make sure the client has connected
  874. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  875. TestJobManager jmgr(TestConcurrentOpWithDisconnectJob(&zkServer,zh),10);
  876. jmgr.startJobsImmediately();
  877. // let everything startup before we shutdown the server
  878. millisleep(4);
  879. // reconnect attempts will start failing immediately
  880. zkServer.setServerDown(0);
  881. // next recv call will return 0
  882. zkServer.setConnectionLost();
  883. jmgr.wait();
  884. VALIDATE_JOBS(jmgr);
  885. }
  886. }
  887. // call zoo_aget() in the multithreaded mode
  888. void testAsyncGetOperation()
  889. {
  890. Mock_gettimeofday timeMock;
  891. ZookeeperServer zkServer;
  892. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  893. // must call zookeeper_close() while all the mocks are in the scope!
  894. CloseFinally guard(&zh);
  895. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  896. CPPUNIT_ASSERT(zh!=0);
  897. // make sure the client has connected
  898. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  899. AsyncGetOperationCompletion res1;
  900. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  901. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  902. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  903. CPPUNIT_ASSERT(ensureCondition(res1,1000)<1000);
  904. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  905. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  906. }
  907. class ChangeNodeWatcher: public WatcherAction{
  908. public:
  909. ChangeNodeWatcher():changed_(false){}
  910. virtual void onNodeValueChanged(zhandle_t*,const char* path){
  911. synchronized(mx_);
  912. changed_=true;
  913. if(path!=0) path_=path;
  914. }
  915. // this predicate checks if CHANGE_EVENT event type was triggered, unlike
  916. // the isWatcherTriggered() that returns true whenever a watcher is triggered
  917. // regardless of the event type
  918. SyncedBoolCondition isNodeChangedTriggered() const{
  919. return SyncedBoolCondition(changed_,mx_);
  920. }
  921. bool changed_;
  922. string path_;
  923. };
  924. class AsyncWatcherCompletion: public AsyncCompletion{
  925. public:
  926. AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
  927. virtual void statCompl(int rc, const Stat *stat){
  928. // we received a server response, now enqueue a watcher event
  929. // to trigger the watcher
  930. zkServer_.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  931. }
  932. ZookeeperServer& zkServer_;
  933. };
  934. // verify that async watcher is called for znode events (CREATED, DELETED etc.)
  935. void testAsyncWatcher1(){
  936. Mock_gettimeofday timeMock;
  937. ZookeeperServer zkServer;
  938. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  939. // must call zookeeper_close() while all the mocks are in the scope!
  940. CloseFinally guard(&zh);
  941. ChangeNodeWatcher action;
  942. zh=zookeeper_init("localhost:2121",activeWatcher,10000,
  943. TEST_CLIENT_ID,&action,0);
  944. CPPUNIT_ASSERT(zh!=0);
  945. // make sure the client has connected
  946. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  947. // set the watcher
  948. AsyncWatcherCompletion completion(zkServer);
  949. // prepare a response for the zoo_aexists() request
  950. zkServer.addOperationResponse(new ZooStatResponse);
  951. int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
  952. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  953. CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
  954. CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);
  955. }
  956. #endif
  957. };
  958. CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_operations);