TestOperations.cc 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <cppunit/extensions/HelperMacros.h>
  19. #include "CppAssertHelper.h"
  20. #include "ZKMocks.h"
  21. #include <proto.h>
  22. using namespace std;
  23. class Zookeeper_operations : public CPPUNIT_NS::TestFixture
  24. {
  25. CPPUNIT_TEST_SUITE(Zookeeper_operations);
  26. #ifndef THREADED
  27. CPPUNIT_TEST(testPing);
  28. CPPUNIT_TEST(testUnsolicitedPing);
  29. CPPUNIT_TEST(testTimeoutCausedByWatches1);
  30. CPPUNIT_TEST(testTimeoutCausedByWatches2);
  31. CPPUNIT_TEST(testCloseWhileInProgressFromMain);
  32. CPPUNIT_TEST(testCloseWhileInProgressFromCompletion);
  33. #else
  34. CPPUNIT_TEST(testAsyncWatcher1);
  35. CPPUNIT_TEST(testAsyncGetOperation);
  36. #endif
  37. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently1);
  38. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently2);
  39. CPPUNIT_TEST(testConcurrentOperations1);
  40. CPPUNIT_TEST_SUITE_END();
  41. zhandle_t *zh;
  42. FILE *logfile;
  43. static void watcher(zhandle_t *, int, int, const char *,void*){}
  44. public:
  45. Zookeeper_operations() {
  46. logfile = openlogfile("Zookeeper_operations");
  47. }
  48. ~Zookeeper_operations() {
  49. if (logfile) {
  50. fflush(logfile);
  51. fclose(logfile);
  52. logfile = 0;
  53. }
  54. }
  55. void setUp()
  56. {
  57. zoo_set_log_stream(logfile);
  58. zoo_deterministic_conn_order(0);
  59. zh=0;
  60. }
  61. void tearDown()
  62. {
  63. zookeeper_close(zh);
  64. }
  65. class AsyncGetOperationCompletion: public AsyncCompletion{
  66. public:
  67. AsyncGetOperationCompletion():called_(false),rc_(ZAPIERROR){}
  68. virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){
  69. synchronized(mx_);
  70. called_=true;
  71. rc_=rc;
  72. value_.erase();
  73. if(rc!=ZOK) return;
  74. value_.assign(value,len);
  75. if(stat)
  76. stat_=*stat;
  77. }
  78. bool operator()()const{
  79. synchronized(mx_);
  80. return called_;
  81. }
  82. mutable Mutex mx_;
  83. bool called_;
  84. int rc_;
  85. string value_;
  86. NodeStat stat_;
  87. };
  88. #ifndef THREADED
  89. // send two get data requests; verify that the corresponding completions called
  90. void testConcurrentOperations1()
  91. {
  92. Mock_gettimeofday timeMock;
  93. ZookeeperServer zkServer;
  94. // must call zookeeper_close() while all the mocks are in scope
  95. CloseFinally guard(&zh);
  96. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  97. CPPUNIT_ASSERT(zh!=0);
  98. // simulate connected state
  99. forceConnected(zh);
  100. int fd=0;
  101. int interest=0;
  102. timeval tv;
  103. // first operation
  104. AsyncGetOperationCompletion res1;
  105. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  106. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  107. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  108. // second operation
  109. AsyncGetOperationCompletion res2;
  110. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  111. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  112. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  113. // process the send queue
  114. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  115. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  116. while((rc=zookeeper_process(zh,interest))==ZOK) {
  117. millisleep(100);
  118. //printf("%d\n", rc);
  119. }
  120. //printf("RC = %d", rc);
  121. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  122. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  123. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  124. CPPUNIT_ASSERT_EQUAL((int)ZOK,res2.rc_);
  125. CPPUNIT_ASSERT_EQUAL(string("2"),res2.value_);
  126. }
  127. // send two getData requests and disconnect while the second request is
  128. // outstanding;
  129. // verify the completions are called
  130. void testOperationsAndDisconnectConcurrently1()
  131. {
  132. Mock_gettimeofday timeMock;
  133. ZookeeperServer zkServer;
  134. // must call zookeeper_close() while all the mocks are in scope
  135. CloseFinally guard(&zh);
  136. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  137. CPPUNIT_ASSERT(zh!=0);
  138. // simulate connected state
  139. forceConnected(zh);
  140. int fd=0;
  141. int interest=0;
  142. timeval tv;
  143. // first operation
  144. AsyncGetOperationCompletion res1;
  145. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  146. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  147. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  148. // second operation
  149. AsyncGetOperationCompletion res2;
  150. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  151. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  152. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  153. // process the send queue
  154. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  155. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  156. rc=zookeeper_process(zh,interest);
  157. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  158. // simulate a disconnect
  159. zkServer.setConnectionLost();
  160. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  161. rc=zookeeper_process(zh,interest);
  162. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
  163. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  164. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  165. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,res2.rc_);
  166. CPPUNIT_ASSERT_EQUAL(string(""),res2.value_);
  167. }
  168. // send two getData requests and simulate timeout while the both request
  169. // are pending;
  170. // verify the completions are called
  171. void testOperationsAndDisconnectConcurrently2()
  172. {
  173. Mock_gettimeofday timeMock;
  174. ZookeeperServer zkServer;
  175. // must call zookeeper_close() while all the mocks are in scope
  176. CloseFinally guard(&zh);
  177. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  178. CPPUNIT_ASSERT(zh!=0);
  179. // simulate connected state
  180. forceConnected(zh);
  181. int fd=0;
  182. int interest=0;
  183. timeval tv;
  184. // first operation
  185. AsyncGetOperationCompletion res1;
  186. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  187. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  188. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  189. // second operation
  190. AsyncGetOperationCompletion res2;
  191. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  192. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  193. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  194. // simulate timeout
  195. timeMock.tick(+10); // advance system time by 10 secs
  196. // the next call to zookeeper_interest should return ZOPERATIONTIMEOUT
  197. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  198. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,rc);
  199. // make sure the completions have been called
  200. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res1.rc_);
  201. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res2.rc_);
  202. }
  203. class PingCountingServer: public ZookeeperServer{
  204. public:
  205. PingCountingServer():pingCount_(0){}
  206. // called when a client request is received
  207. virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
  208. if(rh.type==ZOO_PING_OP){
  209. pingCount_++;
  210. }
  211. }
  212. int pingCount_;
  213. };
  214. // establish a connection; idle for a while
  215. // verify ping was sent at least once
  216. void testPing()
  217. {
  218. const int TIMEOUT=9; // timeout in secs
  219. Mock_gettimeofday timeMock;
  220. PingCountingServer zkServer;
  221. // must call zookeeper_close() while all the mocks are in scope
  222. CloseFinally guard(&zh);
  223. // receive timeout is in milliseconds
  224. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  225. CPPUNIT_ASSERT(zh!=0);
  226. // simulate connected state
  227. forceConnected(zh);
  228. int fd=0;
  229. int interest=0;
  230. timeval tv;
  231. // Round 1.
  232. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  233. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  234. // simulate waiting for the select() call to timeout;
  235. // advance the system clock accordingly
  236. timeMock.tick(tv);
  237. rc=zookeeper_process(zh,interest);
  238. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  239. // verify no ping sent
  240. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  241. // Round 2.
  242. // the client should have the idle threshold exceeded, by now
  243. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  244. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  245. // assume the socket is writable, so no idling here; move on to
  246. // zookeeper_process immediately
  247. rc=zookeeper_process(zh,interest);
  248. // ZNOTHING means the client hasn't received a ping response yet
  249. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  250. // verify a ping is sent
  251. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  252. // Round 3.
  253. // we're going to receive a server PING response and make sure
  254. // that the client has updated its last_recv timestamp
  255. zkServer.addRecvResponse(new PingResponse);
  256. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  257. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  258. // pseudo-sleep for a short while (10 ms)
  259. timeMock.millitick(10);
  260. rc=zookeeper_process(zh,interest);
  261. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  262. // only one ping so far?
  263. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  264. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  265. // Round 4
  266. // make sure that a ping is not sent if something is outstanding
  267. AsyncGetOperationCompletion res1;
  268. rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  269. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  270. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  271. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  272. timeMock.tick(tv);
  273. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  274. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  275. rc=zookeeper_process(zh,interest);
  276. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  277. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  278. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  279. // pseudo-sleep for a short while (10 ms)
  280. timeMock.millitick(10);
  281. rc=zookeeper_process(zh,interest);
  282. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  283. // only one ping so far?
  284. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  285. }
  286. // ZOOKEEPER-2253: Permit unsolicited pings
  287. void testUnsolicitedPing()
  288. {
  289. const int TIMEOUT=9; // timeout in secs
  290. Mock_gettimeofday timeMock;
  291. PingCountingServer zkServer;
  292. // must call zookeeper_close() while all the mocks are in scope
  293. CloseFinally guard(&zh);
  294. // receive timeout is in milliseconds
  295. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  296. CPPUNIT_ASSERT(zh!=0);
  297. // simulate connected state
  298. forceConnected(zh);
  299. int fd=0;
  300. int interest=0;
  301. timeval tv;
  302. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  303. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  304. // verify no ping sent
  305. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  306. // we're going to receive a unsolicited PING response; ensure
  307. // that the client has updated its last_recv timestamp
  308. timeMock.tick(tv);
  309. zkServer.addRecvResponse(new PingResponse);
  310. rc=zookeeper_process(zh,interest);
  311. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  312. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  313. }
  314. // simulate a watch arriving right before a ping is due
  315. // assert the ping is sent nevertheless
  316. void testTimeoutCausedByWatches1()
  317. {
  318. const int TIMEOUT=9; // timeout in secs
  319. Mock_gettimeofday timeMock;
  320. PingCountingServer zkServer;
  321. // must call zookeeper_close() while all the mocks are in scope
  322. CloseFinally guard(&zh);
  323. // receive timeout is in milliseconds
  324. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  325. CPPUNIT_ASSERT(zh!=0);
  326. // simulate connected state
  327. forceConnected(zh);
  328. int fd=0;
  329. int interest=0;
  330. timeval tv;
  331. // Round 1.
  332. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  333. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  334. // simulate waiting for the select() call to timeout;
  335. // advance the system clock accordingly
  336. timeMock.tick(tv);
  337. timeMock.tick(-1); // set the clock to a millisecond before a ping is due
  338. // trigger a watch now
  339. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  340. rc=zookeeper_process(zh,interest);
  341. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  342. // arrival of a watch sets the last_recv to the current time
  343. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  344. // spend 1 millisecond by processing the watch
  345. timeMock.tick(1);
  346. // Round 2.
  347. // a ping is due; zookeeper_interest() must send it now
  348. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  349. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  350. // no delay here -- as if the socket is immediately writable
  351. rc=zookeeper_process(zh,interest);
  352. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  353. // verify a ping is sent
  354. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  355. }
  356. // similar to testTimeoutCausedByWatches1, but this time the watch is
  357. // triggered while the client has an outstanding request
  358. // assert the ping is sent on time
  359. void testTimeoutCausedByWatches2()
  360. {
  361. const int TIMEOUT=9; // timeout in secs
  362. Mock_gettimeofday now;
  363. PingCountingServer zkServer;
  364. // must call zookeeper_close() while all the mocks are in scope
  365. CloseFinally guard(&zh);
  366. // receive timeout is in milliseconds
  367. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  368. CPPUNIT_ASSERT(zh!=0);
  369. // simulate connected state
  370. forceConnected(zh);
  371. // queue up a request; keep it pending (as if the server is busy or has died)
  372. AsyncGetOperationCompletion res1;
  373. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  374. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  375. int fd=0;
  376. int interest=0;
  377. timeval tv;
  378. // Round 1.
  379. // send the queued up zoo_aget() request
  380. Mock_gettimeofday beginningOfTimes(now); // remember when we started
  381. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  382. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  383. // no delay -- the socket is writable
  384. rc=zookeeper_process(zh,interest);
  385. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  386. // Round 2.
  387. // what's next?
  388. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  389. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  390. // no response from the server yet -- waiting in the select() call
  391. now.tick(tv);
  392. // a watch has arrived, thus preventing the connection from timing out
  393. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  394. rc=zookeeper_process(zh,interest);
  395. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); // read the watch message
  396. CPPUNIT_ASSERT_EQUAL(0,zkServer.pingCount_); // not yet!
  397. //Round 3.
  398. // now is the time to send a ping; make sure it's actually sent
  399. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  400. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  401. rc=zookeeper_process(zh,interest);
  402. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  403. // verify a ping is sent
  404. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  405. // make sure only 1/3 of the timeout has passed
  406. CPPUNIT_ASSERT_EQUAL((int32_t)TIMEOUT/3*1000,toMilliseconds(now-beginningOfTimes));
  407. }
  408. // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
  409. // while there is a request waiting for being processed
  410. // call zookeeper_close() from the main event loop
  411. // assert the completion callback is called
  412. void testCloseWhileInProgressFromMain()
  413. {
  414. Mock_gettimeofday timeMock;
  415. ZookeeperServer zkServer;
  416. CloseFinally guard(&zh);
  417. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  418. CPPUNIT_ASSERT(zh!=0);
  419. forceConnected(zh);
  420. zhandle_t* savezh=zh;
  421. // issue a request
  422. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  423. AsyncGetOperationCompletion res1;
  424. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  425. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  426. // but do not allow Zookeeper C Client to process the request
  427. // and call zookeeper_close() from the main event loop immediately
  428. Mock_free_noop freeMock;
  429. rc=zookeeper_close(zh); zh=0;
  430. freeMock.disable();
  431. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  432. // verify that memory for completions was freed (would be freed if no mock installed)
  433. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  434. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  435. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  436. // verify that completion was called, and it was called with ZCLOSING status
  437. CPPUNIT_ASSERT(res1.called_);
  438. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
  439. }
  440. // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
  441. // send some request #1
  442. // then, while there is a request #2 waiting for being processed
  443. // call zookeeper_close() from the completion callback of request #1
  444. // assert the completion callback #2 is called
  445. void testCloseWhileInProgressFromCompletion()
  446. {
  447. Mock_gettimeofday timeMock;
  448. ZookeeperServer zkServer;
  449. CloseFinally guard(&zh);
  450. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  451. CPPUNIT_ASSERT(zh!=0);
  452. forceConnected(zh);
  453. zhandle_t* savezh=zh;
  454. // will handle completion on request #1 and issue request #2 from it
  455. class AsyncGetOperationCompletion1: public AsyncCompletion{
  456. public:
  457. AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
  458. AsyncGetOperationCompletion *res2)
  459. :zh_(zh),zkServer_(zkServer),res2_(res2){}
  460. virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
  461. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);
  462. // from the completion #1 handler, issue request #2
  463. zkServer_->addOperationResponse(new ZooGetResponse("2",1));
  464. int rc2=zoo_aget(*zh_,"/x/y/2",0,asyncCompletion,res2_);
  465. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  466. // but do not allow Zookeeper C Client to process the request #2
  467. // and call zookeeper_close() from the completion callback of request #1
  468. rc2=zookeeper_close(*zh_); *zh_=0;
  469. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  470. // do not disable freeMock here, let completion #2 handler
  471. // return through ZooKeeper C Client internals to the main loop
  472. // and fulfill the work
  473. }
  474. zhandle_t **zh_;
  475. ZookeeperServer *zkServer_;
  476. AsyncGetOperationCompletion *res2_;
  477. };
  478. // issue request #1
  479. AsyncGetOperationCompletion res2;
  480. AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2);
  481. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  482. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  483. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  484. // process the send queue
  485. int fd; int interest; timeval tv;
  486. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  487. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  488. CPPUNIT_ASSERT(zh!=0);
  489. Mock_free_noop freeMock;
  490. while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
  491. millisleep(100);
  492. }
  493. freeMock.disable();
  494. CPPUNIT_ASSERT(zh==0);
  495. // verify that memory for completions was freed (would be freed if no mock installed)
  496. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  497. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  498. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  499. // verify that completion #2 was called, and it was called with ZCLOSING status
  500. CPPUNIT_ASSERT(res2.called_);
  501. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
  502. }
  503. #else
  504. class TestGetDataJob: public TestJob{
  505. public:
  506. TestGetDataJob(ZookeeperServer* svr,zhandle_t* zh, int reps=500)
  507. :svr_(svr),zh_(zh),rc_(ZAPIERROR),reps_(reps){}
  508. virtual void run(){
  509. int i;
  510. for(i=0;i<reps_;i++){
  511. char buf;
  512. int size=sizeof(buf);
  513. if (i % 10 == 0) {
  514. // We need to pause every once in a while so we don't
  515. // get too far ahead and finish before the disconnect
  516. millisleep(1);
  517. }
  518. svr_->addOperationResponse(new ZooGetResponse("1",1));
  519. rc_=zoo_get(zh_,"/x/y/z",0,&buf,&size,0);
  520. if(rc_!=ZOK){
  521. break;
  522. }
  523. }
  524. }
  525. ZookeeperServer* svr_;
  526. zhandle_t* zh_;
  527. int rc_;
  528. int reps_;
  529. };
  530. class TestConcurrentOpJob: public TestGetDataJob{
  531. public:
  532. static const int REPS=500;
  533. TestConcurrentOpJob(ZookeeperServer* svr,zhandle_t* zh):
  534. TestGetDataJob(svr,zh,REPS){}
  535. virtual TestJob* clone() const {
  536. return new TestConcurrentOpJob(svr_,zh_);
  537. }
  538. virtual void validate(const char* file, int line) const{
  539. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  540. }
  541. };
  542. void testConcurrentOperations1()
  543. {
  544. for(int counter=0; counter<50; counter++){
  545. // frozen time -- no timeouts and no pings
  546. Mock_gettimeofday timeMock;
  547. ZookeeperServer zkServer;
  548. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  549. // must call zookeeper_close() while all the mocks are in the scope!
  550. CloseFinally guard(&zh);
  551. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  552. CPPUNIT_ASSERT(zh!=0);
  553. // make sure the client has connected
  554. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  555. TestJobManager jmgr(TestConcurrentOpJob(&zkServer,zh),10);
  556. jmgr.startAllJobs();
  557. jmgr.wait();
  558. // validate test results
  559. VALIDATE_JOBS(jmgr);
  560. }
  561. }
  562. class ZKGetJob: public TestJob{
  563. public:
  564. static const int REPS=1000;
  565. ZKGetJob(zhandle_t* zh)
  566. :zh_(zh),rc_(ZAPIERROR){}
  567. virtual TestJob* clone() const {
  568. return new ZKGetJob(zh_);
  569. }
  570. virtual void run(){
  571. int i;
  572. for(i=0;i<REPS;i++){
  573. char buf;
  574. int size=sizeof(buf);
  575. rc_=zoo_get(zh_,"/xyz",0,&buf,&size,0);
  576. if(rc_!=ZOK){
  577. break;
  578. }
  579. }
  580. //TEST_TRACE("Finished %d iterations",i);
  581. }
  582. virtual void validate(const char* file, int line) const{
  583. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  584. }
  585. zhandle_t* zh_;
  586. int rc_;
  587. };
  588. // this test connects to a real ZK server and creates the /xyz node and sends
  589. // lots of zoo_get requests.
  590. // to run this test use the following command:
  591. // zktest-mt Zookeeper_operations::testOperationsAndDisconnectConcurrently2 localhost:3181
  592. // where the second parameter is the server host and port
  593. void testOperationsAndDisconnectConcurrently2()
  594. {
  595. if(globalTestConfig.getTestName().find(__func__)==string::npos ||
  596. globalTestConfig.getExtraOptCount()==0)
  597. {
  598. // only run this test when specifically asked so
  599. return;
  600. }
  601. string host(*(globalTestConfig.getExtraOptBegin()));
  602. zhandle_t* lzh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  603. CPPUNIT_ASSERT(lzh!=0);
  604. // make sure the client has connected
  605. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  606. ensureCondition(ClientConnected(zh),5000)<5000);
  607. char realpath[1024];
  608. int rc=zoo_create(lzh,"/xyz","1",1,&ZOO_OPEN_ACL_UNSAFE,0,realpath,sizeof(realpath)-1);
  609. CPPUNIT_ASSERT(rc==ZOK || rc==ZNODEEXISTS);
  610. zookeeper_close(lzh);
  611. for(int counter=0; counter<200; counter++){
  612. TEST_TRACE("Loop count %d",counter);
  613. CloseFinally guard(&zh);
  614. zh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  615. CPPUNIT_ASSERT(zh!=0);
  616. // make sure the client has connected
  617. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  618. ensureCondition(ClientConnected(zh),5000)<5000);
  619. TestJobManager jmgr(ZKGetJob(zh),10);
  620. jmgr.startJobsImmediately();
  621. jmgr.wait();
  622. VALIDATE_JOBS(jmgr);
  623. TEST_TRACE("run %d finished",counter);
  624. }
  625. }
  626. class TestConcurrentOpWithDisconnectJob: public TestGetDataJob{
  627. public:
  628. static const int REPS=1000;
  629. TestConcurrentOpWithDisconnectJob(ZookeeperServer* svr,zhandle_t* zh):
  630. TestGetDataJob(svr,zh,REPS){}
  631. virtual TestJob* clone() const {
  632. return new TestConcurrentOpWithDisconnectJob(svr_,zh_);
  633. }
  634. virtual void validate(const char* file, int line) const{
  635. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZCONNECTIONLOSS != rc",(int)ZCONNECTIONLOSS,rc_,file,line);
  636. }
  637. };
  638. // this test is not 100% accurate in a sense it may not detect all error cases.
  639. // TODO: I can't think of a test that is 100% accurate and doesn't interfere
  640. // with the code being tested (in terms of introducing additional
  641. // implicit synchronization points)
  642. void testOperationsAndDisconnectConcurrently1()
  643. {
  644. for(int counter=0; counter<50; counter++){
  645. //TEST_TRACE("Loop count %d",counter);
  646. // frozen time -- no timeouts and no pings
  647. Mock_gettimeofday timeMock;
  648. ZookeeperServer zkServer;
  649. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  650. // must call zookeeper_close() while all the mocks are in the scope!
  651. CloseFinally guard(&zh);
  652. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  653. CPPUNIT_ASSERT(zh!=0);
  654. // make sure the client has connected
  655. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  656. TestJobManager jmgr(TestConcurrentOpWithDisconnectJob(&zkServer,zh),10);
  657. jmgr.startJobsImmediately();
  658. // let everything startup before we shutdown the server
  659. millisleep(4);
  660. // reconnect attempts will start failing immediately
  661. zkServer.setServerDown(0);
  662. // next recv call will return 0
  663. zkServer.setConnectionLost();
  664. jmgr.wait();
  665. VALIDATE_JOBS(jmgr);
  666. }
  667. }
  668. // call zoo_aget() in the multithreaded mode
  669. void testAsyncGetOperation()
  670. {
  671. Mock_gettimeofday timeMock;
  672. ZookeeperServer zkServer;
  673. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  674. // must call zookeeper_close() while all the mocks are in the scope!
  675. CloseFinally guard(&zh);
  676. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  677. CPPUNIT_ASSERT(zh!=0);
  678. // make sure the client has connected
  679. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  680. AsyncGetOperationCompletion res1;
  681. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  682. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  683. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  684. CPPUNIT_ASSERT(ensureCondition(res1,1000)<1000);
  685. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  686. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  687. }
  688. class ChangeNodeWatcher: public WatcherAction{
  689. public:
  690. ChangeNodeWatcher():changed_(false){}
  691. virtual void onNodeValueChanged(zhandle_t*,const char* path){
  692. synchronized(mx_);
  693. changed_=true;
  694. if(path!=0) path_=path;
  695. }
  696. // this predicate checks if CHANGE_EVENT event type was triggered, unlike
  697. // the isWatcherTriggered() that returns true whenever a watcher is triggered
  698. // regardless of the event type
  699. SyncedBoolCondition isNodeChangedTriggered() const{
  700. return SyncedBoolCondition(changed_,mx_);
  701. }
  702. bool changed_;
  703. string path_;
  704. };
  705. class AsyncWatcherCompletion: public AsyncCompletion{
  706. public:
  707. AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
  708. virtual void statCompl(int rc, const Stat *stat){
  709. // we received a server response, now enqueue a watcher event
  710. // to trigger the watcher
  711. zkServer_.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  712. }
  713. ZookeeperServer& zkServer_;
  714. };
  715. // verify that async watcher is called for znode events (CREATED, DELETED etc.)
  716. void testAsyncWatcher1(){
  717. Mock_gettimeofday timeMock;
  718. ZookeeperServer zkServer;
  719. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  720. // must call zookeeper_close() while all the mocks are in the scope!
  721. CloseFinally guard(&zh);
  722. ChangeNodeWatcher action;
  723. zh=zookeeper_init("localhost:2121",activeWatcher,10000,
  724. TEST_CLIENT_ID,&action,0);
  725. CPPUNIT_ASSERT(zh!=0);
  726. // make sure the client has connected
  727. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  728. // set the watcher
  729. AsyncWatcherCompletion completion(zkServer);
  730. // prepare a response for the zoo_aexists() request
  731. zkServer.addOperationResponse(new ZooStatResponse);
  732. int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
  733. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  734. CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
  735. CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);
  736. }
  737. #endif
  738. };
  739. CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_operations);