TestOperations.cc 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <cppunit/extensions/HelperMacros.h>
  19. #include "CppAssertHelper.h"
  20. #include "ZKMocks.h"
  21. #include <proto.h>
  22. using namespace std;
  23. class Zookeeper_operations : public CPPUNIT_NS::TestFixture
  24. {
  25. CPPUNIT_TEST_SUITE(Zookeeper_operations);
  26. #ifndef THREADED
  27. CPPUNIT_TEST(testPing);
  28. CPPUNIT_TEST(testUnsolicitedPing);
  29. CPPUNIT_TEST(testTimeoutCausedByWatches1);
  30. CPPUNIT_TEST(testTimeoutCausedByWatches2);
  31. CPPUNIT_TEST(testCloseWhileInProgressFromMain);
  32. CPPUNIT_TEST(testCloseWhileInProgressFromCompletion);
  33. CPPUNIT_TEST(testCloseWhileMultiInProgressFromMain);
  34. CPPUNIT_TEST(testCloseWhileMultiInProgressFromCompletion);
  35. #else
  36. CPPUNIT_TEST(testAsyncWatcher1);
  37. CPPUNIT_TEST(testAsyncGetOperation);
  38. #endif
  39. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently1);
  40. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently2);
  41. CPPUNIT_TEST(testConcurrentOperations1);
  42. CPPUNIT_TEST_SUITE_END();
  43. zhandle_t *zh;
  44. FILE *logfile;
  45. static void watcher(zhandle_t *, int, int, const char *,void*){}
  46. public:
  47. Zookeeper_operations() {
  48. logfile = openlogfile("Zookeeper_operations");
  49. }
  50. ~Zookeeper_operations() {
  51. if (logfile) {
  52. fflush(logfile);
  53. fclose(logfile);
  54. logfile = 0;
  55. }
  56. }
  57. void setUp()
  58. {
  59. zoo_set_log_stream(logfile);
  60. zoo_deterministic_conn_order(0);
  61. zh=0;
  62. }
  63. void tearDown()
  64. {
  65. zookeeper_close(zh);
  66. }
  67. class AsyncGetOperationCompletion: public AsyncCompletion{
  68. public:
  69. AsyncGetOperationCompletion():called_(false),rc_(ZAPIERROR){}
  70. virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){
  71. synchronized(mx_);
  72. called_=true;
  73. rc_=rc;
  74. value_.erase();
  75. if(rc!=ZOK) return;
  76. value_.assign(value,len);
  77. if(stat)
  78. stat_=*stat;
  79. }
  80. bool operator()()const{
  81. synchronized(mx_);
  82. return called_;
  83. }
  84. mutable Mutex mx_;
  85. bool called_;
  86. int rc_;
  87. string value_;
  88. NodeStat stat_;
  89. };
  90. class AsyncVoidOperationCompletion: public AsyncCompletion{
  91. public:
  92. AsyncVoidOperationCompletion():called_(false),rc_(ZAPIERROR){}
  93. virtual void voidCompl(int rc){
  94. synchronized(mx_);
  95. called_=true;
  96. rc_=rc;
  97. }
  98. bool operator()()const{
  99. synchronized(mx_);
  100. return called_;
  101. }
  102. mutable Mutex mx_;
  103. bool called_;
  104. int rc_;
  105. };
  106. #ifndef THREADED
  107. // send two get data requests; verify that the corresponding completions called
  108. void testConcurrentOperations1()
  109. {
  110. Mock_gettimeofday timeMock;
  111. ZookeeperServer zkServer;
  112. // must call zookeeper_close() while all the mocks are in scope
  113. CloseFinally guard(&zh);
  114. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  115. CPPUNIT_ASSERT(zh!=0);
  116. // simulate connected state
  117. forceConnected(zh);
  118. int fd=0;
  119. int interest=0;
  120. timeval tv;
  121. // first operation
  122. AsyncGetOperationCompletion res1;
  123. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  124. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  125. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  126. // second operation
  127. AsyncGetOperationCompletion res2;
  128. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  129. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  130. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  131. // process the send queue
  132. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  133. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  134. while((rc=zookeeper_process(zh,interest))==ZOK) {
  135. millisleep(100);
  136. //printf("%d\n", rc);
  137. }
  138. //printf("RC = %d", rc);
  139. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  140. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  141. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  142. CPPUNIT_ASSERT_EQUAL((int)ZOK,res2.rc_);
  143. CPPUNIT_ASSERT_EQUAL(string("2"),res2.value_);
  144. }
  145. // send two getData requests and disconnect while the second request is
  146. // outstanding;
  147. // verify the completions are called
  148. void testOperationsAndDisconnectConcurrently1()
  149. {
  150. Mock_gettimeofday timeMock;
  151. ZookeeperServer zkServer;
  152. // must call zookeeper_close() while all the mocks are in scope
  153. CloseFinally guard(&zh);
  154. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  155. CPPUNIT_ASSERT(zh!=0);
  156. // simulate connected state
  157. forceConnected(zh);
  158. int fd=0;
  159. int interest=0;
  160. timeval tv;
  161. // first operation
  162. AsyncGetOperationCompletion res1;
  163. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  164. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  165. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  166. // second operation
  167. AsyncGetOperationCompletion res2;
  168. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  169. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  170. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  171. // process the send queue
  172. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  173. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  174. rc=zookeeper_process(zh,interest);
  175. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  176. // simulate a disconnect
  177. zkServer.setConnectionLost();
  178. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  179. rc=zookeeper_process(zh,interest);
  180. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
  181. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  182. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  183. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,res2.rc_);
  184. CPPUNIT_ASSERT_EQUAL(string(""),res2.value_);
  185. }
  186. // send two getData requests and simulate timeout while the both request
  187. // are pending;
  188. // verify the completions are called
  189. void testOperationsAndDisconnectConcurrently2()
  190. {
  191. Mock_gettimeofday timeMock;
  192. ZookeeperServer zkServer;
  193. // must call zookeeper_close() while all the mocks are in scope
  194. CloseFinally guard(&zh);
  195. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  196. CPPUNIT_ASSERT(zh!=0);
  197. // simulate connected state
  198. forceConnected(zh);
  199. int fd=0;
  200. int interest=0;
  201. timeval tv;
  202. // first operation
  203. AsyncGetOperationCompletion res1;
  204. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  205. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  206. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  207. // second operation
  208. AsyncGetOperationCompletion res2;
  209. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  210. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  211. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  212. // simulate timeout
  213. timeMock.tick(+10); // advance system time by 10 secs
  214. // the next call to zookeeper_interest should return ZOPERATIONTIMEOUT
  215. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  216. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,rc);
  217. // make sure the completions have been called
  218. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res1.rc_);
  219. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res2.rc_);
  220. }
  221. class PingCountingServer: public ZookeeperServer{
  222. public:
  223. PingCountingServer():pingCount_(0){}
  224. // called when a client request is received
  225. virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
  226. if(rh.type==ZOO_PING_OP){
  227. pingCount_++;
  228. }
  229. }
  230. int pingCount_;
  231. };
  232. // establish a connection; idle for a while
  233. // verify ping was sent at least once
  234. void testPing()
  235. {
  236. const int TIMEOUT=9; // timeout in secs
  237. Mock_gettimeofday timeMock;
  238. PingCountingServer zkServer;
  239. // must call zookeeper_close() while all the mocks are in scope
  240. CloseFinally guard(&zh);
  241. // receive timeout is in milliseconds
  242. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  243. CPPUNIT_ASSERT(zh!=0);
  244. // simulate connected state
  245. forceConnected(zh);
  246. int fd=0;
  247. int interest=0;
  248. timeval tv;
  249. // Round 1.
  250. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  251. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  252. // simulate waiting for the select() call to timeout;
  253. // advance the system clock accordingly
  254. timeMock.tick(tv);
  255. rc=zookeeper_process(zh,interest);
  256. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  257. // verify no ping sent
  258. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  259. // Round 2.
  260. // the client should have the idle threshold exceeded, by now
  261. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  262. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  263. // assume the socket is writable, so no idling here; move on to
  264. // zookeeper_process immediately
  265. rc=zookeeper_process(zh,interest);
  266. // ZNOTHING means the client hasn't received a ping response yet
  267. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  268. // verify a ping is sent
  269. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  270. // Round 3.
  271. // we're going to receive a server PING response and make sure
  272. // that the client has updated its last_recv timestamp
  273. zkServer.addRecvResponse(new PingResponse);
  274. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  275. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  276. // pseudo-sleep for a short while (10 ms)
  277. timeMock.millitick(10);
  278. rc=zookeeper_process(zh,interest);
  279. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  280. // only one ping so far?
  281. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  282. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  283. // Round 4
  284. // make sure that a ping is not sent if something is outstanding
  285. AsyncGetOperationCompletion res1;
  286. rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  287. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  288. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  289. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  290. timeMock.tick(tv);
  291. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  292. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  293. rc=zookeeper_process(zh,interest);
  294. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  295. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  296. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  297. // pseudo-sleep for a short while (10 ms)
  298. timeMock.millitick(10);
  299. rc=zookeeper_process(zh,interest);
  300. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  301. // only one ping so far?
  302. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  303. }
  304. // ZOOKEEPER-2253: Permit unsolicited pings
  305. void testUnsolicitedPing()
  306. {
  307. const int TIMEOUT=9; // timeout in secs
  308. Mock_gettimeofday timeMock;
  309. PingCountingServer zkServer;
  310. // must call zookeeper_close() while all the mocks are in scope
  311. CloseFinally guard(&zh);
  312. // receive timeout is in milliseconds
  313. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  314. CPPUNIT_ASSERT(zh!=0);
  315. // simulate connected state
  316. forceConnected(zh);
  317. int fd=0;
  318. int interest=0;
  319. timeval tv;
  320. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  321. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  322. // verify no ping sent
  323. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  324. // we're going to receive a unsolicited PING response; ensure
  325. // that the client has updated its last_recv timestamp
  326. timeMock.tick(tv);
  327. zkServer.addRecvResponse(new PingResponse);
  328. rc=zookeeper_process(zh,interest);
  329. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  330. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  331. }
  332. // simulate a watch arriving right before a ping is due
  333. // assert the ping is sent nevertheless
  334. void testTimeoutCausedByWatches1()
  335. {
  336. const int TIMEOUT=9; // timeout in secs
  337. Mock_gettimeofday timeMock;
  338. PingCountingServer zkServer;
  339. // must call zookeeper_close() while all the mocks are in scope
  340. CloseFinally guard(&zh);
  341. // receive timeout is in milliseconds
  342. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  343. CPPUNIT_ASSERT(zh!=0);
  344. // simulate connected state
  345. forceConnected(zh);
  346. int fd=0;
  347. int interest=0;
  348. timeval tv;
  349. // Round 1.
  350. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  351. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  352. // simulate waiting for the select() call to timeout;
  353. // advance the system clock accordingly
  354. timeMock.tick(tv);
  355. timeMock.tick(-1); // set the clock to a millisecond before a ping is due
  356. // trigger a watch now
  357. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  358. rc=zookeeper_process(zh,interest);
  359. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  360. // arrival of a watch sets the last_recv to the current time
  361. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  362. // spend 1 millisecond by processing the watch
  363. timeMock.tick(1);
  364. // Round 2.
  365. // a ping is due; zookeeper_interest() must send it now
  366. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  367. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  368. // no delay here -- as if the socket is immediately writable
  369. rc=zookeeper_process(zh,interest);
  370. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  371. // verify a ping is sent
  372. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  373. }
  374. // similar to testTimeoutCausedByWatches1, but this time the watch is
  375. // triggered while the client has an outstanding request
  376. // assert the ping is sent on time
  377. void testTimeoutCausedByWatches2()
  378. {
  379. const int TIMEOUT=9; // timeout in secs
  380. Mock_gettimeofday now;
  381. PingCountingServer zkServer;
  382. // must call zookeeper_close() while all the mocks are in scope
  383. CloseFinally guard(&zh);
  384. // receive timeout is in milliseconds
  385. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  386. CPPUNIT_ASSERT(zh!=0);
  387. // simulate connected state
  388. forceConnected(zh);
  389. // queue up a request; keep it pending (as if the server is busy or has died)
  390. AsyncGetOperationCompletion res1;
  391. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  392. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  393. int fd=0;
  394. int interest=0;
  395. timeval tv;
  396. // Round 1.
  397. // send the queued up zoo_aget() request
  398. Mock_gettimeofday beginningOfTimes(now); // remember when we started
  399. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  400. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  401. // no delay -- the socket is writable
  402. rc=zookeeper_process(zh,interest);
  403. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  404. // Round 2.
  405. // what's next?
  406. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  407. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  408. // no response from the server yet -- waiting in the select() call
  409. now.tick(tv);
  410. // a watch has arrived, thus preventing the connection from timing out
  411. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  412. rc=zookeeper_process(zh,interest);
  413. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); // read the watch message
  414. CPPUNIT_ASSERT_EQUAL(0,zkServer.pingCount_); // not yet!
  415. //Round 3.
  416. // now is the time to send a ping; make sure it's actually sent
  417. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  418. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  419. rc=zookeeper_process(zh,interest);
  420. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  421. // verify a ping is sent
  422. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  423. // make sure only 1/3 of the timeout has passed
  424. CPPUNIT_ASSERT_EQUAL((int32_t)TIMEOUT/3*1000,toMilliseconds(now-beginningOfTimes));
  425. }
  426. // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
  427. // while there is a request waiting for being processed
  428. // call zookeeper_close() from the main event loop
  429. // assert the completion callback is called
  430. void testCloseWhileInProgressFromMain()
  431. {
  432. Mock_gettimeofday timeMock;
  433. ZookeeperServer zkServer;
  434. CloseFinally guard(&zh);
  435. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  436. CPPUNIT_ASSERT(zh!=0);
  437. forceConnected(zh);
  438. zhandle_t* savezh=zh;
  439. // issue a request
  440. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  441. AsyncGetOperationCompletion res1;
  442. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  443. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  444. // but do not allow Zookeeper C Client to process the request
  445. // and call zookeeper_close() from the main event loop immediately
  446. Mock_free_noop freeMock;
  447. rc=zookeeper_close(zh); zh=0;
  448. freeMock.disable();
  449. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  450. // verify that memory for completions was freed (would be freed if no mock installed)
  451. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  452. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  453. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  454. // verify that completion was called, and it was called with ZCLOSING status
  455. CPPUNIT_ASSERT(res1.called_);
  456. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
  457. }
  458. // ZOOKEEPER-2894: Memory and completions leak on zookeeper_close
  459. // send some request #1
  460. // then, while there is a request #2 waiting for being processed
  461. // call zookeeper_close() from the completion callback of request #1
  462. // assert the completion callback #2 is called
  463. void testCloseWhileInProgressFromCompletion()
  464. {
  465. Mock_gettimeofday timeMock;
  466. ZookeeperServer zkServer;
  467. CloseFinally guard(&zh);
  468. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  469. CPPUNIT_ASSERT(zh!=0);
  470. forceConnected(zh);
  471. zhandle_t* savezh=zh;
  472. // will handle completion on request #1 and issue request #2 from it
  473. class AsyncGetOperationCompletion1: public AsyncCompletion{
  474. public:
  475. AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
  476. AsyncGetOperationCompletion *res2)
  477. :zh_(zh),zkServer_(zkServer),res2_(res2){}
  478. virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
  479. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);
  480. // from the completion #1 handler, issue request #2
  481. zkServer_->addOperationResponse(new ZooGetResponse("2",1));
  482. int rc2=zoo_aget(*zh_,"/x/y/2",0,asyncCompletion,res2_);
  483. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  484. // but do not allow Zookeeper C Client to process the request #2
  485. // and call zookeeper_close() from the completion callback of request #1
  486. rc2=zookeeper_close(*zh_); *zh_=0;
  487. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  488. // do not disable freeMock here, let completion #2 handler
  489. // return through ZooKeeper C Client internals to the main loop
  490. // and fulfill the work
  491. }
  492. zhandle_t **zh_;
  493. ZookeeperServer *zkServer_;
  494. AsyncGetOperationCompletion *res2_;
  495. };
  496. // issue request #1
  497. AsyncGetOperationCompletion res2;
  498. AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2);
  499. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  500. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  501. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  502. // process the send queue
  503. int fd; int interest; timeval tv;
  504. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  505. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  506. CPPUNIT_ASSERT(zh!=0);
  507. Mock_free_noop freeMock;
  508. while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
  509. millisleep(100);
  510. }
  511. freeMock.disable();
  512. CPPUNIT_ASSERT(zh==0);
  513. // verify that memory for completions was freed (would be freed if no mock installed)
  514. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  515. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  516. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  517. // verify that completion #2 was called, and it was called with ZCLOSING status
  518. CPPUNIT_ASSERT(res2.called_);
  519. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
  520. }
  521. // ZOOKEEPER-2891: Invalid processing of zookeeper_close for mutli-request
  522. // while there is a multi request waiting for being processed
  523. // call zookeeper_close() from the main event loop
  524. // assert the completion callback is called with status ZCLOSING
  525. void testCloseWhileMultiInProgressFromMain()
  526. {
  527. Mock_gettimeofday timeMock;
  528. ZookeeperServer zkServer;
  529. CloseFinally guard(&zh);
  530. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  531. CPPUNIT_ASSERT(zh!=0);
  532. forceConnected(zh);
  533. zhandle_t* savezh=zh;
  534. // issue a multi request
  535. int nops=2;
  536. zoo_op_t ops[nops];
  537. zoo_op_result_t results[nops];
  538. zoo_create_op_init(&ops[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  539. zoo_create_op_init(&ops[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  540. // TODO: Provide ZooMultiResponse. However, it's not required in this test.
  541. // zkServer.addOperationResponse(new ZooMultiResponse(...));
  542. AsyncVoidOperationCompletion res1;
  543. int rc=zoo_amulti(zh,nops,ops,results,asyncCompletion,&res1);
  544. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  545. // but do not allow Zookeeper C Client to process the request
  546. // and call zookeeper_close() from the main event loop immediately
  547. Mock_free_noop freeMock;
  548. rc=zookeeper_close(zh); zh=0;
  549. freeMock.disable();
  550. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  551. // verify that memory for completions was freed (would be freed if no mock installed)
  552. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  553. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  554. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  555. // verify that completion was called, and it was called with ZCLOSING status
  556. CPPUNIT_ASSERT(res1.called_);
  557. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res1.rc_);
  558. }
  559. // ZOOKEEPER-2891: Invalid processing of zookeeper_close for mutli-request
  560. // send some request #1 (not a multi request)
  561. // then, while there is a multi request #2 waiting for being processed
  562. // call zookeeper_close() from the completion callback of request #1
  563. // assert the completion callback #2 is called with status ZCLOSING
  564. void testCloseWhileMultiInProgressFromCompletion()
  565. {
  566. Mock_gettimeofday timeMock;
  567. ZookeeperServer zkServer;
  568. CloseFinally guard(&zh);
  569. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  570. CPPUNIT_ASSERT(zh!=0);
  571. forceConnected(zh);
  572. zhandle_t* savezh=zh;
  573. // these shall persist during the test
  574. int nops=2;
  575. zoo_op_t ops[nops];
  576. zoo_op_result_t results[nops];
  577. // will handle completion on request #1 and issue request #2 from it
  578. class AsyncGetOperationCompletion1: public AsyncCompletion{
  579. public:
  580. AsyncGetOperationCompletion1(zhandle_t **zh, ZookeeperServer *zkServer,
  581. AsyncVoidOperationCompletion *res2,
  582. int nops, zoo_op_t* ops, zoo_op_result_t* results)
  583. :zh_(zh),zkServer_(zkServer),res2_(res2),nops_(nops),ops_(ops),results_(results){}
  584. virtual void dataCompl(int rc1, const char *value, int len, const Stat *stat){
  585. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc1);
  586. // from the completion #1 handler, issue multi request #2
  587. assert(nops_>=2);
  588. zoo_create_op_init(&ops_[0],"/a",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  589. zoo_create_op_init(&ops_[1],"/a/b",0,-1,&ZOO_OPEN_ACL_UNSAFE,0,0,0);
  590. // TODO: Provide ZooMultiResponse. However, it's not required in this test.
  591. // zkServer_->addOperationResponse(new ZooMultiResponse(...));
  592. int rc2=zoo_amulti(*zh_,nops_,ops_,results_,asyncCompletion,res2_);
  593. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  594. // but do not allow Zookeeper C Client to process the request #2
  595. // and call zookeeper_close() from the completion callback of request #1
  596. rc2=zookeeper_close(*zh_); *zh_=0;
  597. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc2);
  598. // do not disable freeMock here, let completion #2 handler
  599. // return through ZooKeeper C Client internals to the main loop
  600. // and fulfill the work
  601. }
  602. zhandle_t **zh_;
  603. ZookeeperServer *zkServer_;
  604. AsyncVoidOperationCompletion *res2_;
  605. int nops_;
  606. zoo_op_t* ops_;
  607. zoo_op_result_t* results_;
  608. };
  609. // issue some request #1 (not a multi request)
  610. AsyncVoidOperationCompletion res2;
  611. AsyncGetOperationCompletion1 res1(&zh,&zkServer,&res2,nops,ops,results);
  612. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  613. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  614. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  615. // process the send queue
  616. int fd; int interest; timeval tv;
  617. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  618. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  619. CPPUNIT_ASSERT(zh!=0);
  620. Mock_free_noop freeMock;
  621. while(zh!=0 && (rc=zookeeper_process(zh,interest))==ZOK) {
  622. millisleep(100);
  623. }
  624. freeMock.disable();
  625. CPPUNIT_ASSERT(zh==0);
  626. // verify that memory for completions was freed (would be freed if no mock installed)
  627. CPPUNIT_ASSERT_EQUAL(1,freeMock.getFreeCount(savezh));
  628. CPPUNIT_ASSERT(savezh->completions_to_process.head==0);
  629. CPPUNIT_ASSERT(savezh->completions_to_process.last==0);
  630. // verify that completion #2 was called, and it was called with ZCLOSING status
  631. CPPUNIT_ASSERT(res2.called_);
  632. CPPUNIT_ASSERT_EQUAL((int)ZCLOSING,res2.rc_);
  633. }
  634. #else
  635. class TestGetDataJob: public TestJob{
  636. public:
  637. TestGetDataJob(ZookeeperServer* svr,zhandle_t* zh, int reps=500)
  638. :svr_(svr),zh_(zh),rc_(ZAPIERROR),reps_(reps){}
  639. virtual void run(){
  640. int i;
  641. for(i=0;i<reps_;i++){
  642. char buf;
  643. int size=sizeof(buf);
  644. if (i % 10 == 0) {
  645. // We need to pause every once in a while so we don't
  646. // get too far ahead and finish before the disconnect
  647. millisleep(1);
  648. }
  649. svr_->addOperationResponse(new ZooGetResponse("1",1));
  650. rc_=zoo_get(zh_,"/x/y/z",0,&buf,&size,0);
  651. if(rc_!=ZOK){
  652. break;
  653. }
  654. }
  655. }
  656. ZookeeperServer* svr_;
  657. zhandle_t* zh_;
  658. int rc_;
  659. int reps_;
  660. };
  661. class TestConcurrentOpJob: public TestGetDataJob{
  662. public:
  663. static const int REPS=500;
  664. TestConcurrentOpJob(ZookeeperServer* svr,zhandle_t* zh):
  665. TestGetDataJob(svr,zh,REPS){}
  666. virtual TestJob* clone() const {
  667. return new TestConcurrentOpJob(svr_,zh_);
  668. }
  669. virtual void validate(const char* file, int line) const{
  670. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  671. }
  672. };
  673. void testConcurrentOperations1()
  674. {
  675. for(int counter=0; counter<50; counter++){
  676. // frozen time -- no timeouts and no pings
  677. Mock_gettimeofday timeMock;
  678. ZookeeperServer zkServer;
  679. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  680. // must call zookeeper_close() while all the mocks are in the scope!
  681. CloseFinally guard(&zh);
  682. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  683. CPPUNIT_ASSERT(zh!=0);
  684. // make sure the client has connected
  685. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  686. TestJobManager jmgr(TestConcurrentOpJob(&zkServer,zh),10);
  687. jmgr.startAllJobs();
  688. jmgr.wait();
  689. // validate test results
  690. VALIDATE_JOBS(jmgr);
  691. }
  692. }
  693. class ZKGetJob: public TestJob{
  694. public:
  695. static const int REPS=1000;
  696. ZKGetJob(zhandle_t* zh)
  697. :zh_(zh),rc_(ZAPIERROR){}
  698. virtual TestJob* clone() const {
  699. return new ZKGetJob(zh_);
  700. }
  701. virtual void run(){
  702. int i;
  703. for(i=0;i<REPS;i++){
  704. char buf;
  705. int size=sizeof(buf);
  706. rc_=zoo_get(zh_,"/xyz",0,&buf,&size,0);
  707. if(rc_!=ZOK){
  708. break;
  709. }
  710. }
  711. //TEST_TRACE("Finished %d iterations",i);
  712. }
  713. virtual void validate(const char* file, int line) const{
  714. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  715. }
  716. zhandle_t* zh_;
  717. int rc_;
  718. };
  719. // this test connects to a real ZK server and creates the /xyz node and sends
  720. // lots of zoo_get requests.
  721. // to run this test use the following command:
  722. // zktest-mt Zookeeper_operations::testOperationsAndDisconnectConcurrently2 localhost:3181
  723. // where the second parameter is the server host and port
  724. void testOperationsAndDisconnectConcurrently2()
  725. {
  726. if(globalTestConfig.getTestName().find(__func__)==string::npos ||
  727. globalTestConfig.getExtraOptCount()==0)
  728. {
  729. // only run this test when specifically asked so
  730. return;
  731. }
  732. string host(*(globalTestConfig.getExtraOptBegin()));
  733. zhandle_t* lzh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  734. CPPUNIT_ASSERT(lzh!=0);
  735. // make sure the client has connected
  736. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  737. ensureCondition(ClientConnected(zh),5000)<5000);
  738. char realpath[1024];
  739. int rc=zoo_create(lzh,"/xyz","1",1,&ZOO_OPEN_ACL_UNSAFE,0,realpath,sizeof(realpath)-1);
  740. CPPUNIT_ASSERT(rc==ZOK || rc==ZNODEEXISTS);
  741. zookeeper_close(lzh);
  742. for(int counter=0; counter<200; counter++){
  743. TEST_TRACE("Loop count %d",counter);
  744. CloseFinally guard(&zh);
  745. zh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  746. CPPUNIT_ASSERT(zh!=0);
  747. // make sure the client has connected
  748. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  749. ensureCondition(ClientConnected(zh),5000)<5000);
  750. TestJobManager jmgr(ZKGetJob(zh),10);
  751. jmgr.startJobsImmediately();
  752. jmgr.wait();
  753. VALIDATE_JOBS(jmgr);
  754. TEST_TRACE("run %d finished",counter);
  755. }
  756. }
  757. class TestConcurrentOpWithDisconnectJob: public TestGetDataJob{
  758. public:
  759. static const int REPS=1000;
  760. TestConcurrentOpWithDisconnectJob(ZookeeperServer* svr,zhandle_t* zh):
  761. TestGetDataJob(svr,zh,REPS){}
  762. virtual TestJob* clone() const {
  763. return new TestConcurrentOpWithDisconnectJob(svr_,zh_);
  764. }
  765. virtual void validate(const char* file, int line) const{
  766. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZCONNECTIONLOSS != rc",(int)ZCONNECTIONLOSS,rc_,file,line);
  767. }
  768. };
  769. // this test is not 100% accurate in a sense it may not detect all error cases.
  770. // TODO: I can't think of a test that is 100% accurate and doesn't interfere
  771. // with the code being tested (in terms of introducing additional
  772. // implicit synchronization points)
  773. void testOperationsAndDisconnectConcurrently1()
  774. {
  775. for(int counter=0; counter<50; counter++){
  776. //TEST_TRACE("Loop count %d",counter);
  777. // frozen time -- no timeouts and no pings
  778. Mock_gettimeofday timeMock;
  779. ZookeeperServer zkServer;
  780. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  781. // must call zookeeper_close() while all the mocks are in the scope!
  782. CloseFinally guard(&zh);
  783. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  784. CPPUNIT_ASSERT(zh!=0);
  785. // make sure the client has connected
  786. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  787. TestJobManager jmgr(TestConcurrentOpWithDisconnectJob(&zkServer,zh),10);
  788. jmgr.startJobsImmediately();
  789. // let everything startup before we shutdown the server
  790. millisleep(4);
  791. // reconnect attempts will start failing immediately
  792. zkServer.setServerDown(0);
  793. // next recv call will return 0
  794. zkServer.setConnectionLost();
  795. jmgr.wait();
  796. VALIDATE_JOBS(jmgr);
  797. }
  798. }
  799. // call zoo_aget() in the multithreaded mode
  800. void testAsyncGetOperation()
  801. {
  802. Mock_gettimeofday timeMock;
  803. ZookeeperServer zkServer;
  804. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  805. // must call zookeeper_close() while all the mocks are in the scope!
  806. CloseFinally guard(&zh);
  807. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  808. CPPUNIT_ASSERT(zh!=0);
  809. // make sure the client has connected
  810. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  811. AsyncGetOperationCompletion res1;
  812. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  813. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  814. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  815. CPPUNIT_ASSERT(ensureCondition(res1,1000)<1000);
  816. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  817. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  818. }
  819. class ChangeNodeWatcher: public WatcherAction{
  820. public:
  821. ChangeNodeWatcher():changed_(false){}
  822. virtual void onNodeValueChanged(zhandle_t*,const char* path){
  823. synchronized(mx_);
  824. changed_=true;
  825. if(path!=0) path_=path;
  826. }
  827. // this predicate checks if CHANGE_EVENT event type was triggered, unlike
  828. // the isWatcherTriggered() that returns true whenever a watcher is triggered
  829. // regardless of the event type
  830. SyncedBoolCondition isNodeChangedTriggered() const{
  831. return SyncedBoolCondition(changed_,mx_);
  832. }
  833. bool changed_;
  834. string path_;
  835. };
  836. class AsyncWatcherCompletion: public AsyncCompletion{
  837. public:
  838. AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
  839. virtual void statCompl(int rc, const Stat *stat){
  840. // we received a server response, now enqueue a watcher event
  841. // to trigger the watcher
  842. zkServer_.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  843. }
  844. ZookeeperServer& zkServer_;
  845. };
  846. // verify that async watcher is called for znode events (CREATED, DELETED etc.)
  847. void testAsyncWatcher1(){
  848. Mock_gettimeofday timeMock;
  849. ZookeeperServer zkServer;
  850. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  851. // must call zookeeper_close() while all the mocks are in the scope!
  852. CloseFinally guard(&zh);
  853. ChangeNodeWatcher action;
  854. zh=zookeeper_init("localhost:2121",activeWatcher,10000,
  855. TEST_CLIENT_ID,&action,0);
  856. CPPUNIT_ASSERT(zh!=0);
  857. // make sure the client has connected
  858. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  859. // set the watcher
  860. AsyncWatcherCompletion completion(zkServer);
  861. // prepare a response for the zoo_aexists() request
  862. zkServer.addOperationResponse(new ZooStatResponse);
  863. int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
  864. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  865. CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
  866. CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);
  867. }
  868. #endif
  869. };
  870. CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_operations);