TestWatchers.cc 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <cppunit/extensions/HelperMacros.h>
  19. #include "CppAssertHelper.h"
  20. #include "ZKMocks.h"
  21. #include "CollectionUtil.h"
  22. #include "Util.h"
  23. class Zookeeper_watchers : public CPPUNIT_NS::TestFixture
  24. {
  25. CPPUNIT_TEST_SUITE(Zookeeper_watchers);
  26. CPPUNIT_TEST(testDefaultSessionWatcher1);
  27. CPPUNIT_TEST(testDefaultSessionWatcher2);
  28. CPPUNIT_TEST(testObjectSessionWatcher1);
  29. CPPUNIT_TEST(testObjectSessionWatcher2);
  30. CPPUNIT_TEST(testNodeWatcher1);
  31. CPPUNIT_TEST(testChildWatcher1);
  32. CPPUNIT_TEST(testChildWatcher2);
  33. CPPUNIT_TEST_SUITE_END();
  34. static void watcher(zhandle_t *, int, int, const char *,void*){}
  35. zhandle_t *zh;
  36. FILE *logfile;
  37. public:
  38. Zookeeper_watchers() {
  39. logfile = openlogfile("Zookeeper_watchers");
  40. }
  41. ~Zookeeper_watchers() {
  42. if (logfile) {
  43. fflush(logfile);
  44. fclose(logfile);
  45. logfile = 0;
  46. }
  47. }
  48. void setUp()
  49. {
  50. zoo_set_log_stream(logfile);
  51. zoo_deterministic_conn_order(0);
  52. zh=0;
  53. }
  54. void tearDown()
  55. {
  56. zookeeper_close(zh);
  57. }
  58. class ConnectionWatcher: public WatcherAction{
  59. public:
  60. ConnectionWatcher():connected_(false),counter_(0){}
  61. virtual void onConnectionEstablished(zhandle_t*){
  62. synchronized(mx_);
  63. counter_++;
  64. connected_=true;
  65. }
  66. SyncedBoolCondition isConnectionEstablished() const{
  67. return SyncedBoolCondition(connected_,mx_);
  68. }
  69. bool connected_;
  70. int counter_;
  71. };
  72. class DisconnectWatcher: public WatcherAction{
  73. public:
  74. DisconnectWatcher():disconnected_(false),counter_(0){}
  75. virtual void onConnectionLost(zhandle_t*){
  76. synchronized(mx_);
  77. counter_++;
  78. disconnected_=true;
  79. }
  80. SyncedBoolCondition isDisconnected() const{
  81. return SyncedBoolCondition(disconnected_,mx_);
  82. }
  83. bool disconnected_;
  84. int counter_;
  85. };
  86. class CountingDataWatcher: public WatcherAction{
  87. public:
  88. CountingDataWatcher():disconnected_(false),counter_(0){}
  89. virtual void onNodeValueChanged(zhandle_t*,const char* path){
  90. synchronized(mx_);
  91. counter_++;
  92. }
  93. virtual void onConnectionLost(zhandle_t*){
  94. synchronized(mx_);
  95. counter_++;
  96. disconnected_=true;
  97. }
  98. bool disconnected_;
  99. int counter_;
  100. };
  101. class DeletionCountingDataWatcher: public WatcherAction{
  102. public:
  103. DeletionCountingDataWatcher():counter_(0){}
  104. virtual void onNodeDeleted(zhandle_t*,const char* path){
  105. synchronized(mx_);
  106. counter_++;
  107. }
  108. int counter_;
  109. };
  110. class ChildEventCountingWatcher: public WatcherAction{
  111. public:
  112. ChildEventCountingWatcher():counter_(0){}
  113. virtual void onChildChanged(zhandle_t*,const char* path){
  114. synchronized(mx_);
  115. counter_++;
  116. }
  117. int counter_;
  118. };
  119. #ifndef THREADED
  120. // verify: the default watcher is called once for a session event
  121. void testDefaultSessionWatcher1(){
  122. Mock_gettimeofday timeMock;
  123. ZookeeperServer zkServer;
  124. // must call zookeeper_close() while all the mocks are in scope
  125. CloseFinally guard(&zh);
  126. ConnectionWatcher watcher;
  127. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  128. &watcher,0);
  129. CPPUNIT_ASSERT(zh!=0);
  130. int fd=0;
  131. int interest=0;
  132. timeval tv;
  133. // open the socket
  134. int rc=zookeeper_interest(zh,&fd,&interest,&tv);
  135. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  136. CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTING_STATE,zoo_state(zh));
  137. // send the handshake packet to the server
  138. rc=zookeeper_process(zh,interest);
  139. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  140. CPPUNIT_ASSERT_EQUAL(ZOO_ASSOCIATING_STATE,zoo_state(zh));
  141. // receive the server handshake response
  142. rc=zookeeper_process(zh,interest);
  143. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  144. // verify connected
  145. CPPUNIT_ASSERT_EQUAL(ZOO_CONNECTED_STATE,zoo_state(zh));
  146. CPPUNIT_ASSERT(watcher.connected_);
  147. CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
  148. }
  149. // test case: connect to server, set a default watcher, disconnect from the server
  150. // verify: the default watcher is called once
  151. void testDefaultSessionWatcher2(){
  152. Mock_gettimeofday timeMock;
  153. ZookeeperServer zkServer;
  154. // must call zookeeper_close() while all the mocks are in scope
  155. CloseFinally guard(&zh);
  156. DisconnectWatcher watcher;
  157. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  158. &watcher,0);
  159. CPPUNIT_ASSERT(zh!=0);
  160. // simulate connected state
  161. forceConnected(zh);
  162. // first operation
  163. AsyncCompletion ignored;
  164. zkServer.addOperationResponse(new ZooGetResponse("1",1));
  165. int rc=zoo_aget(zh,"/x/y/1",0,asyncCompletion,&ignored);
  166. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  167. // this will process the response and activate the watcher
  168. rc=zookeeper_process(zh,ZOOKEEPER_READ);
  169. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  170. // now, disconnect
  171. zkServer.setConnectionLost();
  172. rc=zookeeper_process(zh,ZOOKEEPER_READ);
  173. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
  174. // verify disconnected
  175. CPPUNIT_ASSERT(watcher.disconnected_);
  176. CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
  177. }
  178. // testcase: connect to the server, set a watcher object on a node,
  179. // disconnect from the server
  180. // verify: the watcher object as well as the default watcher are called
  181. void testObjectSessionWatcher1(){
  182. Mock_gettimeofday timeMock;
  183. ZookeeperServer zkServer;
  184. // must call zookeeper_close() while all the mocks are in scope
  185. CloseFinally guard(&zh);
  186. DisconnectWatcher defWatcher;
  187. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  188. &defWatcher,0);
  189. CPPUNIT_ASSERT(zh!=0);
  190. // simulate connected state
  191. forceConnected(zh);
  192. AsyncCompletion ignored;
  193. CountingDataWatcher wobject;
  194. zkServer.addOperationResponse(new ZooStatResponse);
  195. int rc=zoo_awexists(zh,"/x/y/1",activeWatcher,&wobject,
  196. asyncCompletion,&ignored);
  197. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  198. // this will process the response and activate the watcher
  199. rc=zookeeper_process(zh,ZOOKEEPER_READ);
  200. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  201. // now, disconnect
  202. zkServer.setConnectionLost();
  203. rc=zookeeper_process(zh,ZOOKEEPER_READ);
  204. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
  205. // verify the default watcher has been triggered
  206. CPPUNIT_ASSERT(defWatcher.disconnected_);
  207. // and triggered only once
  208. CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
  209. // the path-specific watcher has been triggered as well
  210. CPPUNIT_ASSERT(wobject.disconnected_);
  211. // only once!
  212. CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
  213. }
  214. // testcase: connect to the server, set a watcher object on a node,
  215. // set a def watcher on another node,disconnect from the server
  216. // verify: the watcher object as well as the default watcher are called
  217. void testObjectSessionWatcher2(){
  218. Mock_gettimeofday timeMock;
  219. ZookeeperServer zkServer;
  220. // must call zookeeper_close() while all the mocks are in scope
  221. CloseFinally guard(&zh);
  222. DisconnectWatcher defWatcher;
  223. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  224. &defWatcher,0);
  225. CPPUNIT_ASSERT(zh!=0);
  226. // simulate connected state
  227. forceConnected(zh);
  228. // set the default watcher
  229. AsyncCompletion ignored;
  230. zkServer.addOperationResponse(new ZooStatResponse);
  231. int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
  232. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  233. CountingDataWatcher wobject;
  234. zkServer.addOperationResponse(new ZooStatResponse);
  235. rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
  236. asyncCompletion,&ignored);
  237. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  238. // this will process the response and activate the watcher
  239. while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
  240. millisleep(100);
  241. }
  242. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  243. // disconnect now
  244. zkServer.setConnectionLost();
  245. rc=zookeeper_process(zh,ZOOKEEPER_READ);
  246. CPPUNIT_ASSERT_EQUAL((int)ZCONNECTIONLOSS,rc);
  247. // verify the default watcher has been triggered
  248. CPPUNIT_ASSERT(defWatcher.disconnected_);
  249. // and triggered only once
  250. CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
  251. // the path-specific watcher has been triggered as well
  252. CPPUNIT_ASSERT(wobject.disconnected_);
  253. // only once!
  254. CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
  255. }
  256. // testcase: register 2 node watches for different paths, trigger the watches
  257. // verify: the data watchers are processed, the default watcher is not called
  258. void testNodeWatcher1(){
  259. Mock_gettimeofday timeMock;
  260. ZookeeperServer zkServer;
  261. // must call zookeeper_close() while all the mocks are in scope
  262. CloseFinally guard(&zh);
  263. DisconnectWatcher defWatcher;
  264. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  265. &defWatcher,0);
  266. CPPUNIT_ASSERT(zh!=0);
  267. // simulate connected state
  268. forceConnected(zh);
  269. AsyncCompletion ignored;
  270. CountingDataWatcher wobject1;
  271. zkServer.addOperationResponse(new ZooStatResponse);
  272. int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
  273. asyncCompletion,&ignored);
  274. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  275. CountingDataWatcher wobject2;
  276. zkServer.addOperationResponse(new ZooStatResponse);
  277. rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
  278. asyncCompletion,&ignored);
  279. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  280. // this will process the response and activate the watcher
  281. while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
  282. millisleep(100);
  283. }
  284. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  285. // we are all set now; let's trigger the watches
  286. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/a/b/c"));
  287. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  288. // make sure all watchers have been processed
  289. while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
  290. millisleep(100);
  291. }
  292. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  293. CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
  294. CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
  295. CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
  296. }
  297. // testcase: set up both a children and a data watchers on the node /a, then
  298. // delete the node by sending a DELETE_EVENT event
  299. // verify: both watchers are triggered
  300. void testChildWatcher1(){
  301. Mock_gettimeofday timeMock;
  302. ZookeeperServer zkServer;
  303. // must call zookeeper_close() while all the mocks are in scope
  304. CloseFinally guard(&zh);
  305. DeletionCountingDataWatcher defWatcher;
  306. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  307. &defWatcher,0);
  308. CPPUNIT_ASSERT(zh!=0);
  309. // simulate connected state
  310. forceConnected(zh);
  311. AsyncCompletion ignored;
  312. DeletionCountingDataWatcher wobject1;
  313. zkServer.addOperationResponse(new ZooStatResponse);
  314. int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
  315. asyncCompletion,&ignored);
  316. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  317. typedef ZooGetChildrenResponse::StringVector ZooVector;
  318. zkServer.addOperationResponse(new ZooGetChildrenResponse(
  319. Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
  320. ));
  321. DeletionCountingDataWatcher wobject2;
  322. rc=zoo_awget_children(zh,"/a",activeWatcher,
  323. &wobject2,asyncCompletion,&ignored);
  324. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  325. // this will process the response and activate the watcher
  326. while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
  327. millisleep(100);
  328. }
  329. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  330. // we are all set now; let's trigger the watches
  331. zkServer.addRecvResponse(new ZNodeEvent(ZOO_DELETED_EVENT,"/a"));
  332. // make sure the watchers have been processed
  333. while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
  334. millisleep(100);
  335. }
  336. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  337. CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
  338. CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
  339. CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
  340. }
  341. // testcase: create both a child and data watch on the node /a, send a ZOO_CHILD_EVENT
  342. // verify: only the child watch triggered
  343. void testChildWatcher2(){
  344. Mock_gettimeofday timeMock;
  345. ZookeeperServer zkServer;
  346. // must call zookeeper_close() while all the mocks are in scope
  347. CloseFinally guard(&zh);
  348. ChildEventCountingWatcher defWatcher;
  349. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  350. &defWatcher,0);
  351. CPPUNIT_ASSERT(zh!=0);
  352. // simulate connected state
  353. forceConnected(zh);
  354. AsyncCompletion ignored;
  355. ChildEventCountingWatcher wobject1;
  356. zkServer.addOperationResponse(new ZooStatResponse);
  357. int rc=zoo_awexists(zh,"/a",activeWatcher,&wobject1,
  358. asyncCompletion,&ignored);
  359. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  360. typedef ZooGetChildrenResponse::StringVector ZooVector;
  361. zkServer.addOperationResponse(new ZooGetChildrenResponse(
  362. Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
  363. ));
  364. ChildEventCountingWatcher wobject2;
  365. rc=zoo_awget_children(zh,"/a",activeWatcher,
  366. &wobject2,asyncCompletion,&ignored);
  367. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  368. // this will process the response and activate the watcher
  369. while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
  370. millisleep(100);
  371. }
  372. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  373. // we are all set now; let's trigger the watches
  374. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHILD_EVENT,"/a"));
  375. // make sure the watchers have been processed
  376. while((rc=zookeeper_process(zh,ZOOKEEPER_READ))==ZOK) {
  377. millisleep(100);
  378. }
  379. CPPUNIT_ASSERT_EQUAL((int)ZNOTHING,rc);
  380. CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
  381. CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
  382. CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
  383. }
  384. #else
  385. // verify: the default watcher is called once for a session event
  386. void testDefaultSessionWatcher1(){
  387. Mock_gettimeofday timeMock;
  388. // zookeeper simulator
  389. ZookeeperServer zkServer;
  390. // detects when all watchers have been delivered
  391. WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTED_STATE);
  392. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  393. // must call zookeeper_close() while all the mocks are in the scope!
  394. CloseFinally guard(&zh);
  395. ConnectionWatcher watcher;
  396. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  397. &watcher,0);
  398. CPPUNIT_ASSERT(zh!=0);
  399. // wait till watcher processing has completed (the connection
  400. // established event)
  401. CPPUNIT_ASSERT(ensureCondition(
  402. deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
  403. // verify the watcher has been triggered
  404. CPPUNIT_ASSERT(ensureCondition(watcher.isConnectionEstablished(),1000)<1000);
  405. // triggered only once
  406. CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
  407. }
  408. // test case: connect to server, set a default watcher, disconnect from the server
  409. // verify: the default watcher is called once
  410. void testDefaultSessionWatcher2(){
  411. Mock_gettimeofday timeMock;
  412. // zookeeper simulator
  413. ZookeeperServer zkServer;
  414. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  415. // must call zookeeper_close() while all the mocks are in the scope!
  416. CloseFinally guard(&zh);
  417. // detects when all watchers have been delivered
  418. WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTING_STATE);
  419. DisconnectWatcher watcher;
  420. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  421. &watcher,0);
  422. CPPUNIT_ASSERT(zh!=0);
  423. // make sure the client has connected
  424. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  425. // set a default watch
  426. AsyncCompletion ignored;
  427. // a successful server response will activate the watcher
  428. zkServer.addOperationResponse(new ZooStatResponse);
  429. int rc=zoo_aexists(zh,"/x/y/z",1,asyncCompletion,&ignored);
  430. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  431. // now, initiate a disconnect
  432. zkServer.setConnectionLost();
  433. CPPUNIT_ASSERT(ensureCondition(
  434. deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
  435. // verify the watcher has been triggered
  436. CPPUNIT_ASSERT(watcher.disconnected_);
  437. // triggered only once
  438. CPPUNIT_ASSERT_EQUAL(1,watcher.counter_);
  439. }
  440. // testcase: connect to the server, set a watcher object on a node,
  441. // disconnect from the server
  442. // verify: the watcher object as well as the default watcher are called
  443. void testObjectSessionWatcher1(){
  444. Mock_gettimeofday timeMock;
  445. // zookeeper simulator
  446. ZookeeperServer zkServer;
  447. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  448. // must call zookeeper_close() while all the mocks are in the scope!
  449. CloseFinally guard(&zh);
  450. // detects when all watchers have been delivered
  451. WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTING_STATE);
  452. DisconnectWatcher defWatcher;
  453. // use the tracker to find out when the watcher has been activated
  454. WatcherActivationTracker activationTracker;
  455. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  456. &defWatcher,0);
  457. CPPUNIT_ASSERT(zh!=0);
  458. // make sure the client has connected
  459. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  460. AsyncCompletion ignored;
  461. // this successful server response will activate the watcher
  462. zkServer.addOperationResponse(new ZooStatResponse);
  463. CountingDataWatcher wobject;
  464. activationTracker.track(&wobject);
  465. // set a path-specific watcher
  466. int rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
  467. asyncCompletion,&ignored);
  468. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  469. // make sure the watcher gets activated before we continue
  470. CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
  471. // now, initiate a disconnect
  472. zkServer.setConnectionLost();
  473. // make sure all watchers have been processed
  474. CPPUNIT_ASSERT(ensureCondition(
  475. deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
  476. // verify the default watcher has been triggered
  477. CPPUNIT_ASSERT(defWatcher.disconnected_);
  478. // and triggered only once
  479. CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
  480. // the path-specific watcher has been triggered as well
  481. CPPUNIT_ASSERT(wobject.disconnected_);
  482. // only once!
  483. CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
  484. }
  485. // testcase: connect to the server, set a watcher object on a node,
  486. // set a def watcher on another node,disconnect from the server
  487. // verify: the watcher object as well as the default watcher are called
  488. void testObjectSessionWatcher2(){
  489. Mock_gettimeofday timeMock;
  490. // zookeeper simulator
  491. ZookeeperServer zkServer;
  492. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  493. // must call zookeeper_close() while all the mocks are in the scope!
  494. CloseFinally guard(&zh);
  495. // detects when all watchers have been delivered
  496. WatcherDeliveryTracker deliveryTracker(ZOO_SESSION_EVENT,ZOO_CONNECTING_STATE);
  497. DisconnectWatcher defWatcher;
  498. // use the tracker to find out when the watcher has been activated
  499. WatcherActivationTracker activationTracker;
  500. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  501. &defWatcher,0);
  502. CPPUNIT_ASSERT(zh!=0);
  503. // make sure the client has connected
  504. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  505. // set a default watch
  506. AsyncCompletion ignored;
  507. // a successful server response will activate the watcher
  508. zkServer.addOperationResponse(new ZooStatResponse);
  509. activationTracker.track(&defWatcher);
  510. int rc=zoo_aexists(zh,"/a/b/c",1,asyncCompletion,&ignored);
  511. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  512. // make sure the watcher gets activated before we continue
  513. CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
  514. // this successful server response will activate the watcher
  515. zkServer.addOperationResponse(new ZooStatResponse);
  516. CountingDataWatcher wobject;
  517. activationTracker.track(&wobject);
  518. // set a path-specific watcher
  519. rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject,
  520. asyncCompletion,&ignored);
  521. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  522. // make sure the watcher gets activated before we continue
  523. CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
  524. // now, initiate a disconnect
  525. zkServer.setConnectionLost();
  526. // make sure all watchers have been processed
  527. CPPUNIT_ASSERT(ensureCondition(
  528. deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
  529. // verify the default watcher has been triggered
  530. CPPUNIT_ASSERT(defWatcher.disconnected_);
  531. // and triggered only once
  532. CPPUNIT_ASSERT_EQUAL(1,defWatcher.counter_);
  533. // the path-specific watcher has been triggered as well
  534. CPPUNIT_ASSERT(wobject.disconnected_);
  535. // only once!
  536. CPPUNIT_ASSERT_EQUAL(1,wobject.counter_);
  537. }
  538. // testcase: register 2 node watches for different paths, trigger the watches
  539. // verify: the data watchers are processed, the default watcher is not called
  540. void testNodeWatcher1(){
  541. Mock_gettimeofday timeMock;
  542. // zookeeper simulator
  543. ZookeeperServer zkServer;
  544. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  545. // must call zookeeper_close() while all the mocks are in the scope!
  546. CloseFinally guard(&zh);
  547. // detects when all watchers have been delivered
  548. WatcherDeliveryTracker deliveryTracker(ZOO_CHANGED_EVENT,0,false);
  549. CountingDataWatcher defWatcher;
  550. // use the tracker to find out when the watcher has been activated
  551. WatcherActivationTracker activationTracker;
  552. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  553. &defWatcher,0);
  554. CPPUNIT_ASSERT(zh!=0);
  555. // make sure the client has connected
  556. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  557. // don't care about completions
  558. AsyncCompletion ignored;
  559. // set a one-shot watch
  560. // a successful server response will activate the watcher
  561. zkServer.addOperationResponse(new ZooStatResponse);
  562. CountingDataWatcher wobject1;
  563. activationTracker.track(&wobject1);
  564. int rc=zoo_awexists(zh,"/a/b/c",activeWatcher,&wobject1,
  565. asyncCompletion,&ignored);
  566. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  567. // make sure the watcher gets activated before we continue
  568. CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
  569. // this successful server response will activate the watcher
  570. zkServer.addOperationResponse(new ZooStatResponse);
  571. CountingDataWatcher wobject2;
  572. activationTracker.track(&wobject2);
  573. // set a path-specific watcher
  574. rc=zoo_awexists(zh,"/x/y/z",activeWatcher,&wobject2,
  575. asyncCompletion,&ignored);
  576. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  577. // make sure the watcher gets activated before we continue
  578. CPPUNIT_ASSERT(ensureCondition(activationTracker.isWatcherActivated(),1000)<1000);
  579. // we are all set now; let's trigger the watches
  580. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/a/b/c"));
  581. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHANGED_EVENT,"/x/y/z"));
  582. // make sure all watchers have been processed
  583. CPPUNIT_ASSERT(ensureCondition(
  584. deliveryTracker.deliveryCounterEquals(2),1000)<1000);
  585. CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
  586. CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
  587. CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
  588. }
  589. // testcase: set up both a children and a data watchers on the node /a, then
  590. // delete the node (that is, send a DELETE_EVENT)
  591. // verify: both watchers are triggered
  592. void testChildWatcher1(){
  593. Mock_gettimeofday timeMock;
  594. // zookeeper simulator
  595. ZookeeperServer zkServer;
  596. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  597. // must call zookeeper_close() while all the mocks are in the scope!
  598. CloseFinally guard(&zh);
  599. // detects when all watchers have been delivered
  600. WatcherDeliveryTracker deliveryTracker(ZOO_DELETED_EVENT,0);
  601. DeletionCountingDataWatcher defWatcher;
  602. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  603. &defWatcher,0);
  604. CPPUNIT_ASSERT(zh!=0);
  605. // make sure the client has connected
  606. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  607. // a successful server response will activate the watcher
  608. zkServer.addOperationResponse(new ZooStatResponse);
  609. DeletionCountingDataWatcher wobject1;
  610. Stat stat;
  611. // add a node watch
  612. int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
  613. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  614. typedef ZooGetChildrenResponse::StringVector ZooVector;
  615. zkServer.addOperationResponse(new ZooGetChildrenResponse(
  616. Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
  617. ));
  618. DeletionCountingDataWatcher wobject2;
  619. String_vector children;
  620. rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
  621. deallocate_String_vector(&children);
  622. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  623. // we are all set now; let's trigger the watches
  624. zkServer.addRecvResponse(new ZNodeEvent(ZOO_DELETED_EVENT,"/a"));
  625. // make sure the watchers have been processed
  626. CPPUNIT_ASSERT(ensureCondition(
  627. deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
  628. CPPUNIT_ASSERT_EQUAL(1,wobject1.counter_);
  629. CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
  630. CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
  631. }
  632. // testcase: create both a child and data watch on the node /a, send a ZOO_CHILD_EVENT
  633. // verify: only the child watch triggered
  634. void testChildWatcher2(){
  635. Mock_gettimeofday timeMock;
  636. // zookeeper simulator
  637. ZookeeperServer zkServer;
  638. Mock_poll pollMock(&zkServer,ZookeeperServer::FD);
  639. // must call zookeeper_close() while all the mocks are in the scope!
  640. CloseFinally guard(&zh);
  641. // detects when all watchers have been delivered
  642. WatcherDeliveryTracker deliveryTracker(ZOO_CHILD_EVENT,0);
  643. ChildEventCountingWatcher defWatcher;
  644. zh=zookeeper_init("localhost:2121",activeWatcher,10000,TEST_CLIENT_ID,
  645. &defWatcher,0);
  646. CPPUNIT_ASSERT(zh!=0);
  647. // make sure the client has connected
  648. CPPUNIT_ASSERT(ensureCondition(ClientConnected(zh),1000)<1000);
  649. // a successful server response will activate the watcher
  650. zkServer.addOperationResponse(new ZooStatResponse);
  651. ChildEventCountingWatcher wobject1;
  652. Stat stat;
  653. // add a node watch
  654. int rc=zoo_wexists(zh,"/a",activeWatcher,&wobject1,&stat);
  655. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  656. typedef ZooGetChildrenResponse::StringVector ZooVector;
  657. zkServer.addOperationResponse(new ZooGetChildrenResponse(
  658. Util::CollectionBuilder<ZooVector>()("/a/1")("/a/2")
  659. ));
  660. ChildEventCountingWatcher wobject2;
  661. String_vector children;
  662. rc=zoo_wget_children(zh,"/a",activeWatcher,&wobject2,&children);
  663. deallocate_String_vector(&children);
  664. CPPUNIT_ASSERT_EQUAL((int)ZOK,rc);
  665. // we are all set now; let's trigger the watches
  666. zkServer.addRecvResponse(new ZNodeEvent(ZOO_CHILD_EVENT,"/a"));
  667. // make sure the watchers have been processed
  668. CPPUNIT_ASSERT(ensureCondition(
  669. deliveryTracker.isWatcherProcessingCompleted(),1000)<1000);
  670. CPPUNIT_ASSERT_EQUAL(0,wobject1.counter_);
  671. CPPUNIT_ASSERT_EQUAL(1,wobject2.counter_);
  672. CPPUNIT_ASSERT_EQUAL(0,defWatcher.counter_);
  673. }
  674. #endif //THREADED
  675. };
  676. CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_watchers);