TestOperations.cc 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <cppunit/extensions/HelperMacros.h>
  19. #include "CppAssertHelper.h"
  20. #include "ZKMocks.h"
  21. #include <proto.h>
  22. using namespace std;
  23. class Zookeeper_operations : public CPPUNIT_NS::TestFixture
  24. {
  25. CPPUNIT_TEST_SUITE(Zookeeper_operations);
  26. #ifndef THREADED
  27. CPPUNIT_TEST(testPing);
  28. CPPUNIT_TEST(testUnsolicitedPing);
  29. CPPUNIT_TEST(testTimeoutCausedByWatches1);
  30. CPPUNIT_TEST(testTimeoutCausedByWatches2);
  31. #else
  32. CPPUNIT_TEST(testAsyncWatcher1);
  33. CPPUNIT_TEST(testAsyncGetOperation);
  34. #endif
  35. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently1);
  36. CPPUNIT_TEST(testOperationsAndDisconnectConcurrently2);
  37. CPPUNIT_TEST(testConcurrentOperations1);
  38. CPPUNIT_TEST_SUITE_END();
  39. zhandle_t *zh;
  40. FILE *logfile;
  41. static void watcher(zhandle_t *, int, int, const char *,void*){}
  42. public:
  43. Zookeeper_operations() {
  44. logfile = openlogfile("Zookeeper_operations");
  45. }
  46. ~Zookeeper_operations() {
  47. if (logfile) {
  48. fflush(logfile);
  49. fclose(logfile);
  50. logfile = 0;
  51. }
  52. }
  53. void setUp()
  54. {
  55. zoo_set_log_stream(logfile);
  56. zoo_deterministic_conn_order(0);
  57. zh=0;
  58. }
  59. void tearDown()
  60. {
  61. zookeeper_close(zh);
  62. }
  63. class AsyncGetOperationCompletion: public AsyncCompletion{
  64. public:
  65. AsyncGetOperationCompletion():called_(false),rc_(ZAPIERROR){}
  66. virtual void dataCompl(int rc, const char *value, int len, const Stat *stat){
  67. synchronized(mx_);
  68. called_=true;
  69. rc_=rc;
  70. value_.erase();
  71. if(rc!=ZOK) return;
  72. value_.assign(value,len);
  73. if(stat)
  74. stat_=*stat;
  75. }
  76. bool operator()()const{
  77. synchronized(mx_);
  78. return called_;
  79. }
  80. mutable Mutex mx_;
  81. bool called_;
  82. int rc_;
  83. string value_;
  84. NodeStat stat_;
  85. };
  86. #ifndef THREADED
  87. // send two get data requests; verify that the corresponding completions called
  88. void testConcurrentOperations1()
  89. {
  90. Mock_gettimeofday timeMock;
  91. ZookeeperServer zkServer;
  92. // must call zookeeper_close() while all the mocks are in scope
  93. CloseFinally guard(&zh);
  94. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  95. CPPUNIT_ASSERT(zh!=0);
  96. // simulate connected state
  97. forceConnected(zh);
  98. int fd=0;
  99. int interest=0;
  100. timeval tv;
  101. // first operation
  102. AsyncGetOperationCompletion res1;
  103. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  104. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  105. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  106. // second operation
  107. AsyncGetOperationCompletion res2;
  108. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  109. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  110. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  111. // process the send queue
  112. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  113. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  114. while((rc=zookeeper_process(zh,interest))==ZOK) {
  115. millisleep(100);
  116. //printf("%d\n", rc);
  117. }
  118. //printf("RC = %d", rc);
  119. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  120. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  121. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  122. CPPUNIT_ASSERT_EQUAL((int)ZOK,res2.rc_);
  123. CPPUNIT_ASSERT_EQUAL(string("2"),res2.value_);
  124. }
  125. // send two getData requests and disconnect while the second request is
  126. // outstanding;
  127. // verify the completions are called
  128. void testOperationsAndDisconnectConcurrently1()
  129. {
  130. Mock_gettimeofday timeMock;
  131. ZookeeperServer zkServer;
  132. // must call zookeeper_close() while all the mocks are in scope
  133. CloseFinally guard(&zh);
  134. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  135. CPPUNIT_ASSERT(zh!=0);
  136. // simulate connected state
  137. forceConnected(zh);
  138. int fd=0;
  139. int interest=0;
  140. timeval tv;
  141. // first operation
  142. AsyncGetOperationCompletion res1;
  143. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  144. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  145. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  146. // second operation
  147. AsyncGetOperationCompletion res2;
  148. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  149. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  150. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  151. // process the send queue
  152. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  153. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  154. rc=zookeeper_process(zh,interest);
  155. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  156. // simulate a disconnect
  157. zkServer.setConnectionLost();
  158. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  159. rc=zookeeper_process(zh,interest);
  160. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
  161. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  162. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  163. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,res2.rc_);
  164. CPPUNIT_ASSERT_EQUAL(string(""),res2.value_);
  165. }
  166. // send two getData requests and simulate timeout while the both request
  167. // are pending;
  168. // verify the completions are called
  169. void testOperationsAndDisconnectConcurrently2()
  170. {
  171. Mock_gettimeofday timeMock;
  172. ZookeeperServer zkServer;
  173. // must call zookeeper_close() while all the mocks are in scope
  174. CloseFinally guard(&zh);
  175. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  176. CPPUNIT_ASSERT(zh!=0);
  177. // simulate connected state
  178. forceConnected(zh);
  179. int fd=0;
  180. int interest=0;
  181. timeval tv;
  182. // first operation
  183. AsyncGetOperationCompletion res1;
  184. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  185. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  186. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  187. // second operation
  188. AsyncGetOperationCompletion res2;
  189. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  190. rc=zoo_aget(zh,"/x/y/2",0,asyncCompletion,&res2);
  191. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  192. // simulate timeout
  193. timeMock.tick(+10); // advance system time by 10 secs
  194. // the next call to zookeeper_interest should return ZOPERATIONTIMEOUT
  195. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  196. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,rc);
  197. // make sure the completions have been called
  198. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res1.rc_);
  199. CPPUNIT_ASSERT_EQUAL((int)ZOPERATIONTIMEOUT,res2.rc_);
  200. }
  201. class PingCountingServer: public ZookeeperServer{
  202. public:
  203. PingCountingServer():pingCount_(0){}
  204. // called when a client request is received
  205. virtual void onMessageReceived(const RequestHeader& rh, iarchive* ia){
  206. if(rh.type==ZOO_PING_OP){
  207. pingCount_++;
  208. }
  209. }
  210. int pingCount_;
  211. };
  212. // establish a connection; idle for a while
  213. // verify ping was sent at least once
  214. void testPing()
  215. {
  216. const int TIMEOUT=9; // timeout in secs
  217. Mock_gettimeofday timeMock;
  218. PingCountingServer zkServer;
  219. // must call zookeeper_close() while all the mocks are in scope
  220. CloseFinally guard(&zh);
  221. // receive timeout is in milliseconds
  222. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  223. CPPUNIT_ASSERT(zh!=0);
  224. // simulate connected state
  225. forceConnected(zh);
  226. int fd=0;
  227. int interest=0;
  228. timeval tv;
  229. // Round 1.
  230. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  231. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  232. // simulate waiting for the select() call to timeout;
  233. // advance the system clock accordingly
  234. timeMock.tick(tv);
  235. rc=zookeeper_process(zh,interest);
  236. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  237. // verify no ping sent
  238. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  239. // Round 2.
  240. // the client should have the idle threshold exceeded, by now
  241. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  242. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  243. // assume the socket is writable, so no idling here; move on to
  244. // zookeeper_process immediately
  245. rc=zookeeper_process(zh,interest);
  246. // ZNOTHING means the client hasn't received a ping response yet
  247. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  248. // verify a ping is sent
  249. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  250. // Round 3.
  251. // we're going to receive a server PING response and make sure
  252. // that the client has updated its last_recv timestamp
  253. zkServer.addRecvResponse(new PingResponse);
  254. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  255. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  256. // pseudo-sleep for a short while (10 ms)
  257. timeMock.millitick(10);
  258. rc=zookeeper_process(zh,interest);
  259. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  260. // only one ping so far?
  261. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  262. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  263. // Round 4
  264. // make sure that a ping is not sent if something is outstanding
  265. AsyncGetOperationCompletion res1;
  266. rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  267. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  268. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  269. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  270. timeMock.tick(tv);
  271. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  272. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  273. rc=zookeeper_process(zh,interest);
  274. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  275. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  276. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  277. // pseudo-sleep for a short while (10 ms)
  278. timeMock.millitick(10);
  279. rc=zookeeper_process(zh,interest);
  280. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  281. // only one ping so far?
  282. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  283. }
  284. // ZOOKEEPER-2253: Permit unsolicited pings
  285. void testUnsolicitedPing()
  286. {
  287. const int TIMEOUT=9; // timeout in secs
  288. Mock_gettimeofday timeMock;
  289. PingCountingServer zkServer;
  290. // must call zookeeper_close() while all the mocks are in scope
  291. CloseFinally guard(&zh);
  292. // receive timeout is in milliseconds
  293. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  294. CPPUNIT_ASSERT(zh!=0);
  295. // simulate connected state
  296. forceConnected(zh);
  297. int fd=0;
  298. int interest=0;
  299. timeval tv;
  300. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  301. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  302. // verify no ping sent
  303. CPPUNIT_ASSERT(zkServer.pingCount_==0);
  304. // we're going to receive a unsolicited PING response; ensure
  305. // that the client has updated its last_recv timestamp
  306. timeMock.tick(tv);
  307. zkServer.addRecvResponse(new PingResponse);
  308. rc=zookeeper_process(zh,interest);
  309. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  310. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  311. }
  312. // simulate a watch arriving right before a ping is due
  313. // assert the ping is sent nevertheless
  314. void testTimeoutCausedByWatches1()
  315. {
  316. const int TIMEOUT=9; // timeout in secs
  317. Mock_gettimeofday timeMock;
  318. PingCountingServer zkServer;
  319. // must call zookeeper_close() while all the mocks are in scope
  320. CloseFinally guard(&zh);
  321. // receive timeout is in milliseconds
  322. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  323. CPPUNIT_ASSERT(zh!=0);
  324. // simulate connected state
  325. forceConnected(zh);
  326. int fd=0;
  327. int interest=0;
  328. timeval tv;
  329. // Round 1.
  330. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  331. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  332. // simulate waiting for the select() call to timeout;
  333. // advance the system clock accordingly
  334. timeMock.tick(tv);
  335. timeMock.tick(-1); // set the clock to a millisecond before a ping is due
  336. // trigger a watch now
  337. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  338. rc=zookeeper_process(zh,interest);
  339. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  340. // arrival of a watch sets the last_recv to the current time
  341. CPPUNIT_ASSERT(timeMock==zh->last_recv);
  342. // spend 1 millisecond by processing the watch
  343. timeMock.tick(1);
  344. // Round 2.
  345. // a ping is due; zookeeper_interest() must send it now
  346. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  347. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  348. // no delay here -- as if the socket is immediately writable
  349. rc=zookeeper_process(zh,interest);
  350. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  351. // verify a ping is sent
  352. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  353. }
  354. // similar to testTimeoutCausedByWatches1, but this time the watch is
  355. // triggered while the client has an outstanding request
  356. // assert the ping is sent on time
  357. void testTimeoutCausedByWatches2()
  358. {
  359. const int TIMEOUT=9; // timeout in secs
  360. Mock_gettimeofday now;
  361. PingCountingServer zkServer;
  362. // must call zookeeper_close() while all the mocks are in scope
  363. CloseFinally guard(&zh);
  364. // receive timeout is in milliseconds
  365. zh=zookeeper_init("localhost:1234",watcher,TIMEOUT*1000,TEST_CLIENT_ID,0,0);
  366. CPPUNIT_ASSERT(zh!=0);
  367. // simulate connected state
  368. forceConnected(zh);
  369. // queue up a request; keep it pending (as if the server is busy or has died)
  370. AsyncGetOperationCompletion res1;
  371. zkServer.addOperationResponse(new ZooGetResponse("2",1));
  372. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  373. int fd=0;
  374. int interest=0;
  375. timeval tv;
  376. // Round 1.
  377. // send the queued up zoo_aget() request
  378. Mock_gettimeofday beginningOfTimes(now); // remember when we started
  379. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  380. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  381. // no delay -- the socket is writable
  382. rc=zookeeper_process(zh,interest);
  383. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  384. // Round 2.
  385. // what's next?
  386. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  387. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  388. // no response from the server yet -- waiting in the select() call
  389. now.tick(tv);
  390. // a watch has arrived, thus preventing the connection from timing out
  391. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  392. rc=zookeeper_process(zh,interest);
  393. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc); // read the watch message
  394. CPPUNIT_ASSERT_EQUAL(0,zkServer.pingCount_); // not yet!
  395. //Round 3.
  396. // now is the time to send a ping; make sure it's actually sent
  397. rc=zookeeper_interest(zh,&fd,&interest,&tv);
  398. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  399. rc=zookeeper_process(zh,interest);
  400. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  401. // verify a ping is sent
  402. CPPUNIT_ASSERT_EQUAL(1,zkServer.pingCount_);
  403. // make sure only 1/3 of the timeout has passed
  404. CPPUNIT_ASSERT_EQUAL((int32_t)TIMEOUT/3*1000,toMilliseconds(now-beginningOfTimes));
  405. }
  406. #else
  407. class TestGetDataJob: public TestJob{
  408. public:
  409. TestGetDataJob(ZookeeperServer* svr,zhandle_t* zh, int reps=500)
  410. :svr_(svr),zh_(zh),rc_(ZAPIERROR),reps_(reps){}
  411. virtual void run(){
  412. int i;
  413. for(i=0;i<reps_;i++){
  414. char buf;
  415. int size=sizeof(buf);
  416. if (i % 10 == 0) {
  417. // We need to pause every once in a while so we don't
  418. // get too far ahead and finish before the disconnect
  419. millisleep(1);
  420. }
  421. svr_->addOperationResponse(new ZooGetResponse("1",1));
  422. rc_=zoo_get(zh_,"/x/y/z",0,&buf,&size,0);
  423. if(rc_!=ZOK){
  424. break;
  425. }
  426. }
  427. }
  428. ZookeeperServer* svr_;
  429. zhandle_t* zh_;
  430. int rc_;
  431. int reps_;
  432. };
  433. class TestConcurrentOpJob: public TestGetDataJob{
  434. public:
  435. static const int REPS=500;
  436. TestConcurrentOpJob(ZookeeperServer* svr,zhandle_t* zh):
  437. TestGetDataJob(svr,zh,REPS){}
  438. virtual TestJob* clone() const {
  439. return new TestConcurrentOpJob(svr_,zh_);
  440. }
  441. virtual void validate(const char* file, int line) const{
  442. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  443. }
  444. };
  445. void testConcurrentOperations1()
  446. {
  447. for(int counter=0; counter<50; counter++){
  448. // frozen time -- no timeouts and no pings
  449. Mock_gettimeofday timeMock;
  450. ZookeeperServer zkServer;
  451. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  452. // must call zookeeper_close() while all the mocks are in the scope!
  453. CloseFinally guard(&zh);
  454. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  455. CPPUNIT_ASSERT(zh!=0);
  456. // make sure the client has connected
  457. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  458. TestJobManager jmgr(TestConcurrentOpJob(&zkServer,zh),10);
  459. jmgr.startAllJobs();
  460. jmgr.wait();
  461. // validate test results
  462. VALIDATE_JOBS(jmgr);
  463. }
  464. }
  465. class ZKGetJob: public TestJob{
  466. public:
  467. static const int REPS=1000;
  468. ZKGetJob(zhandle_t* zh)
  469. :zh_(zh),rc_(ZAPIERROR){}
  470. virtual TestJob* clone() const {
  471. return new ZKGetJob(zh_);
  472. }
  473. virtual void run(){
  474. int i;
  475. for(i=0;i<REPS;i++){
  476. char buf;
  477. int size=sizeof(buf);
  478. rc_=zoo_get(zh_,"/xyz",0,&buf,&size,0);
  479. if(rc_!=ZOK){
  480. break;
  481. }
  482. }
  483. //TEST_TRACE("Finished %d iterations",i);
  484. }
  485. virtual void validate(const char* file, int line) const{
  486. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
  487. }
  488. zhandle_t* zh_;
  489. int rc_;
  490. };
  491. // this test connects to a real ZK server and creates the /xyz node and sends
  492. // lots of zoo_get requests.
  493. // to run this test use the following command:
  494. // zktest-mt Zookeeper_operations::testOperationsAndDisconnectConcurrently2 localhost:3181
  495. // where the second parameter is the server host and port
  496. void testOperationsAndDisconnectConcurrently2()
  497. {
  498. if(globalTestConfig.getTestName().find(__func__)==string::npos ||
  499. globalTestConfig.getExtraOptCount()==0)
  500. {
  501. // only run this test when specifically asked so
  502. return;
  503. }
  504. string host(*(globalTestConfig.getExtraOptBegin()));
  505. zhandle_t* lzh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  506. CPPUNIT_ASSERT(lzh!=0);
  507. // make sure the client has connected
  508. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  509. ensureCondition(ClientConnected(zh),5000)<5000);
  510. char realpath[1024];
  511. int rc=zoo_create(lzh,"/xyz","1",1,&ZOO_OPEN_ACL_UNSAFE,0,realpath,sizeof(realpath)-1);
  512. CPPUNIT_ASSERT(rc==ZOK || rc==ZNODEEXISTS);
  513. zookeeper_close(lzh);
  514. for(int counter=0; counter<200; counter++){
  515. TEST_TRACE("Loop count %d",counter);
  516. CloseFinally guard(&zh);
  517. zh=zookeeper_init(host.c_str(),watcher,10000,0,0,0);
  518. CPPUNIT_ASSERT(zh!=0);
  519. // make sure the client has connected
  520. CPPUNIT_ASSERT_MESSAGE("Unable to connect to the host",
  521. ensureCondition(ClientConnected(zh),5000)<5000);
  522. TestJobManager jmgr(ZKGetJob(zh),10);
  523. jmgr.startJobsImmediately();
  524. jmgr.wait();
  525. VALIDATE_JOBS(jmgr);
  526. TEST_TRACE("run %d finished",counter);
  527. }
  528. }
  529. class TestConcurrentOpWithDisconnectJob: public TestGetDataJob{
  530. public:
  531. static const int REPS=1000;
  532. TestConcurrentOpWithDisconnectJob(ZookeeperServer* svr,zhandle_t* zh):
  533. TestGetDataJob(svr,zh,REPS){}
  534. virtual TestJob* clone() const {
  535. return new TestConcurrentOpWithDisconnectJob(svr_,zh_);
  536. }
  537. virtual void validate(const char* file, int line) const{
  538. CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZCONNECTIONLOSS != rc",(int)ZCONNECTIONLOSS,rc_,file,line);
  539. }
  540. };
  541. // this test is not 100% accurate in a sense it may not detect all error cases.
  542. // TODO: I can't think of a test that is 100% accurate and doesn't interfere
  543. // with the code being tested (in terms of introducing additional
  544. // implicit synchronization points)
  545. void testOperationsAndDisconnectConcurrently1()
  546. {
  547. for(int counter=0; counter<50; counter++){
  548. //TEST_TRACE("Loop count %d",counter);
  549. // frozen time -- no timeouts and no pings
  550. Mock_gettimeofday timeMock;
  551. ZookeeperServer zkServer;
  552. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  553. // must call zookeeper_close() while all the mocks are in the scope!
  554. CloseFinally guard(&zh);
  555. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  556. CPPUNIT_ASSERT(zh!=0);
  557. // make sure the client has connected
  558. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  559. TestJobManager jmgr(TestConcurrentOpWithDisconnectJob(&zkServer,zh),10);
  560. jmgr.startJobsImmediately();
  561. // let everything startup before we shutdown the server
  562. millisleep(4);
  563. // reconnect attempts will start failing immediately
  564. zkServer.setServerDown(0);
  565. // next recv call will return 0
  566. zkServer.setConnectionLost();
  567. jmgr.wait();
  568. VALIDATE_JOBS(jmgr);
  569. }
  570. }
  571. // call zoo_aget() in the multithreaded mode
  572. void testAsyncGetOperation()
  573. {
  574. Mock_gettimeofday timeMock;
  575. ZookeeperServer zkServer;
  576. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  577. // must call zookeeper_close() while all the mocks are in the scope!
  578. CloseFinally guard(&zh);
  579. zh=zookeeper_init("localhost:2121",watcher,10000,TEST_CLIENT_ID,0,0);
  580. CPPUNIT_ASSERT(zh!=0);
  581. // make sure the client has connected
  582. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  583. AsyncGetOperationCompletion res1;
  584. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  585. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&res1);
  586. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  587. CPPUNIT_ASSERT(ensureCondition(res1,1000)<1000);
  588. CPPUNIT_ASSERT_EQUAL((int)ZOK,res1.rc_);
  589. CPPUNIT_ASSERT_EQUAL(string("1"),res1.value_);
  590. }
  591. class ChangeNodeWatcher: public WatcherAction{
  592. public:
  593. ChangeNodeWatcher():changed_(false){}
  594. virtual void onNodeValueChanged(zhandle_t*,const char* path){
  595. synchronized(mx_);
  596. changed_=true;
  597. if(path!=0) path_=path;
  598. }
  599. // this predicate checks if CHANGE_EVENT event type was triggered, unlike
  600. // the isWatcherTriggered() that returns true whenever a watcher is triggered
  601. // regardless of the event type
  602. SyncedBoolCondition isNodeChangedTriggered() const{
  603. return SyncedBoolCondition(changed_,mx_);
  604. }
  605. bool changed_;
  606. string path_;
  607. };
  608. class AsyncWatcherCompletion: public AsyncCompletion{
  609. public:
  610. AsyncWatcherCompletion(ZookeeperServer& zkServer):zkServer_(zkServer){}
  611. virtual void statCompl(int rc, const Stat *stat){
  612. // we received a server response, now enqueue a watcher event
  613. // to trigger the watcher
  614. zkServer_.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  615. }
  616. ZookeeperServer& zkServer_;
  617. };
  618. // verify that async watcher is called for znode events (CREATED, DELETED etc.)
  619. void testAsyncWatcher1(){
  620. Mock_gettimeofday timeMock;
  621. ZookeeperServer zkServer;
  622. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  623. // must call zookeeper_close() while all the mocks are in the scope!
  624. CloseFinally guard(&zh);
  625. ChangeNodeWatcher action;
  626. zh=zookeeper_init("localhost:2121",activeWatcher,10000,
  627. TEST_CLIENT_ID,&action,0);
  628. CPPUNIT_ASSERT(zh!=0);
  629. // make sure the client has connected
  630. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  631. // set the watcher
  632. AsyncWatcherCompletion completion(zkServer);
  633. // prepare a response for the zoo_aexists() request
  634. zkServer.addOperationResponse(new ZooStatResponse);
  635. int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&completion);
  636. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  637. CPPUNIT_ASSERT(ensureCondition(action.isNodeChangedTriggered(),1000)<1000);
  638. CPPUNIT_ASSERT_EQUAL(string("/x/y/z"),action.path_);
  639. }
  640. #endif
  641. };
  642. CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_operations);