ZKMocks.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <arpa/inet.h> // for htonl
  19. #include <memory>
  20. #include <zookeeper.h>
  21. #include <proto.h>
  22. #ifdef THREADED
  23. #include "PthreadMocks.h"
  24. #endif
  25. #include "ZKMocks.h"
  26. using namespace std;
  27. TestClientId testClientId;
  28. const char* TestClientId::PASSWD="1234567890123456";
  29. HandshakeRequest* HandshakeRequest::parse(const std::string& buf) {
  30. unique_ptr<HandshakeRequest> req(new HandshakeRequest);
  31. memcpy(&req->protocolVersion,buf.data(), sizeof(req->protocolVersion));
  32. req->protocolVersion = htonl(req->protocolVersion);
  33. int offset=sizeof(req->protocolVersion);
  34. memcpy(&req->lastZxidSeen,buf.data()+offset,sizeof(req->lastZxidSeen));
  35. req->lastZxidSeen = zoo_htonll(req->lastZxidSeen);
  36. offset+=sizeof(req->lastZxidSeen);
  37. memcpy(&req->timeOut,buf.data()+offset,sizeof(req->timeOut));
  38. req->timeOut = htonl(req->timeOut);
  39. offset+=sizeof(req->timeOut);
  40. memcpy(&req->sessionId,buf.data()+offset,sizeof(req->sessionId));
  41. req->sessionId = zoo_htonll(req->sessionId);
  42. offset+=sizeof(req->sessionId);
  43. memcpy(&req->passwd_len,buf.data()+offset,sizeof(req->passwd_len));
  44. req->passwd_len = htonl(req->passwd_len);
  45. offset+=sizeof(req->passwd_len);
  46. memcpy(req->passwd,buf.data()+offset,sizeof(req->passwd));
  47. offset+=sizeof(req->passwd);
  48. memcpy(&req->readOnly,buf.data()+offset,sizeof(req->readOnly));
  49. if(testClientId.client_id==req->sessionId &&
  50. !memcmp(testClientId.passwd,req->passwd,sizeof(req->passwd)))
  51. return req.release();
  52. // the request didn't match -- may not be a handshake request after all
  53. return 0;
  54. }
  55. // *****************************************************************************
  56. // watcher action implementation
  57. void activeWatcher(zhandle_t *zh,
  58. int type, int state, const char *path,void* ctx) {
  59. if (zh == 0 || ctx == 0)
  60. return;
  61. WatcherAction* action = (WatcherAction *)ctx;
  62. if (type == ZOO_SESSION_EVENT) {
  63. if (state == ZOO_EXPIRED_SESSION_STATE)
  64. action->onSessionExpired(zh);
  65. else if(state == ZOO_CONNECTING_STATE)
  66. action->onConnectionLost(zh);
  67. else if(state == ZOO_CONNECTED_STATE)
  68. action->onConnectionEstablished(zh);
  69. } else if (type == ZOO_CHANGED_EVENT)
  70. action->onNodeValueChanged(zh,path);
  71. else if (type == ZOO_DELETED_EVENT)
  72. action->onNodeDeleted(zh,path);
  73. else if (type == ZOO_CHILD_EVENT)
  74. action->onChildChanged(zh,path);
  75. // TODO: implement for the rest of the event types
  76. action->setWatcherTriggered();
  77. }
  78. SyncedBoolCondition WatcherAction::isWatcherTriggered() const {
  79. return SyncedBoolCondition(triggered_,mx_);
  80. }
  81. // a set of async completion signatures
  82. void asyncCompletion(int rc, ACL_vector *acl,Stat *stat, const void *data){
  83. assert("Completion data is NULL"&&data);
  84. static_cast<AsyncCompletion*>((void*)data)->aclCompl(rc,acl,stat);
  85. }
  86. void asyncCompletion(int rc, const char *value, int len, const Stat *stat,
  87. const void *data) {
  88. assert("Completion data is NULL"&&data);
  89. static_cast<AsyncCompletion*>((void*)data)->dataCompl(rc,value,len,stat);
  90. }
  91. void asyncCompletion(int rc, const Stat *stat, const void *data) {
  92. assert("Completion data is NULL"&&data);
  93. static_cast<AsyncCompletion*>((void*)data)->statCompl(rc,stat);
  94. }
  95. void asyncCompletion(int rc, const char *value, const void *data) {
  96. assert("Completion data is NULL"&&data);
  97. static_cast<AsyncCompletion*>((void*)data)->stringCompl(rc,value);
  98. }
  99. void asyncCompletion(int rc,const String_vector *strings, const void *data) {
  100. assert("Completion data is NULL"&&data);
  101. static_cast<AsyncCompletion*>((void*)data)->stringsCompl(rc,strings);
  102. }
  103. void asyncCompletion(int rc, const void *data) {
  104. assert("Completion data is NULL"&&data);
  105. static_cast<AsyncCompletion*>((void*)data)->voidCompl(rc);
  106. }
  107. // a predicate implementation
  108. bool IOThreadStopped::operator()() const{
  109. #ifdef THREADED
  110. adaptor_threads* adaptor=(adaptor_threads*)zh_->adaptor_priv;
  111. return CheckedPthread::isTerminated(adaptor->io);
  112. #else
  113. assert("IOThreadStopped predicate is only for use with THREADED client" &&
  114. false);
  115. return false;
  116. #endif
  117. }
  118. //******************************************************************************
  119. //
  120. DECLARE_WRAPPER(int,flush_send_queue,(zhandle_t*zh, int timeout))
  121. {
  122. if(!Mock_flush_send_queue::mock_)
  123. return CALL_REAL(flush_send_queue,(zh,timeout));
  124. return Mock_flush_send_queue::mock_->call(zh,timeout);
  125. }
  126. Mock_flush_send_queue* Mock_flush_send_queue::mock_=0;
  127. //******************************************************************************
  128. //
  129. DECLARE_WRAPPER(int32_t,get_xid,())
  130. {
  131. if(!Mock_get_xid::mock_)
  132. return CALL_REAL(get_xid,());
  133. return Mock_get_xid::mock_->call();
  134. }
  135. Mock_get_xid* Mock_get_xid::mock_=0;
  136. //******************************************************************************
  137. // activateWatcher mock
  138. DECLARE_WRAPPER(void,activateWatcher,(zhandle_t *zh, watcher_registration_t* reg, int rc))
  139. {
  140. if(!Mock_activateWatcher::mock_){
  141. CALL_REAL(activateWatcher,(zh, reg,rc));
  142. }else{
  143. Mock_activateWatcher::mock_->call(zh, reg,rc);
  144. }
  145. }
  146. Mock_activateWatcher* Mock_activateWatcher::mock_=0;
  147. class ActivateWatcherWrapper: public Mock_activateWatcher{
  148. public:
  149. ActivateWatcherWrapper():ctx_(0),activated_(false){}
  150. virtual void call(zhandle_t *zh, watcher_registration_t* reg, int rc){
  151. CALL_REAL(activateWatcher,(zh, reg,rc));
  152. synchronized(mx_);
  153. if(reg->context==ctx_){
  154. activated_=true;
  155. ctx_=0;
  156. }
  157. }
  158. void setContext(void* ctx){
  159. synchronized(mx_);
  160. ctx_=ctx;
  161. activated_=false;
  162. }
  163. SyncedBoolCondition isActivated() const{
  164. return SyncedBoolCondition(activated_,mx_);
  165. }
  166. mutable Mutex mx_;
  167. void* ctx_;
  168. bool activated_;
  169. };
  170. WatcherActivationTracker::WatcherActivationTracker():
  171. wrapper_(new ActivateWatcherWrapper)
  172. {
  173. }
  174. WatcherActivationTracker::~WatcherActivationTracker(){
  175. delete wrapper_;
  176. }
  177. void WatcherActivationTracker::track(void* ctx){
  178. wrapper_->setContext(ctx);
  179. }
  180. SyncedBoolCondition WatcherActivationTracker::isWatcherActivated() const{
  181. return wrapper_->isActivated();
  182. }
  183. //******************************************************************************
  184. //
  185. DECLARE_WRAPPER(void,deliverWatchers,(zhandle_t* zh,int type,int state, const char* path, watcher_object_list_t **list))
  186. {
  187. if(!Mock_deliverWatchers::mock_){
  188. CALL_REAL(deliverWatchers,(zh,type,state,path, list));
  189. }else{
  190. Mock_deliverWatchers::mock_->call(zh,type,state,path, list);
  191. }
  192. }
  193. Mock_deliverWatchers* Mock_deliverWatchers::mock_=0;
  194. struct RefCounterValue{
  195. RefCounterValue(zhandle_t* const& zh,int32_t expectedCounter,Mutex& mx):
  196. zh_(zh),expectedCounter_(expectedCounter),mx_(mx){}
  197. bool operator()() const{
  198. {
  199. synchronized(mx_);
  200. if(zh_==0)
  201. return false;
  202. }
  203. return inc_ref_counter(zh_,0)==expectedCounter_;
  204. }
  205. zhandle_t* const& zh_;
  206. int32_t expectedCounter_;
  207. Mutex& mx_;
  208. };
  209. class DeliverWatchersWrapper: public Mock_deliverWatchers{
  210. public:
  211. DeliverWatchersWrapper(int type,int state,bool terminate):
  212. type_(type),state_(state),
  213. allDelivered_(false),terminate_(terminate),zh_(0),deliveryCounter_(0){}
  214. virtual void call(zhandle_t* zh, int type, int state,
  215. const char* path, watcher_object_list **list) {
  216. {
  217. synchronized(mx_);
  218. zh_=zh;
  219. allDelivered_=false;
  220. }
  221. CALL_REAL(deliverWatchers,(zh,type,state,path, list));
  222. if(type_==type && state_==state){
  223. if(terminate_){
  224. // prevent zhandle_t from being prematurely distroyed;
  225. // this will also ensure that zookeeper_close() cleanups the
  226. // thread resources by calling finish_adaptor()
  227. inc_ref_counter(zh,1);
  228. terminateZookeeperThreads(zh);
  229. }
  230. synchronized(mx_);
  231. allDelivered_=true;
  232. deliveryCounter_++;
  233. }
  234. }
  235. SyncedBoolCondition isDelivered() const{
  236. if(terminate_){
  237. int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000);
  238. assert(i<1000);
  239. }
  240. return SyncedBoolCondition(allDelivered_,mx_);
  241. }
  242. void resetDeliveryCounter(){
  243. synchronized(mx_);
  244. deliveryCounter_=0;
  245. }
  246. SyncedIntegerEqual deliveryCounterEquals(int expected) const{
  247. if(terminate_){
  248. int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000);
  249. assert(i<1000);
  250. }
  251. return SyncedIntegerEqual(deliveryCounter_,expected,mx_);
  252. }
  253. int type_;
  254. int state_;
  255. mutable Mutex mx_;
  256. bool allDelivered_;
  257. bool terminate_;
  258. zhandle_t* zh_;
  259. int deliveryCounter_;
  260. };
  261. WatcherDeliveryTracker::WatcherDeliveryTracker(
  262. int type,int state,bool terminateCompletionThread):
  263. deliveryWrapper_(new DeliverWatchersWrapper(
  264. type,state,terminateCompletionThread)){
  265. }
  266. WatcherDeliveryTracker::~WatcherDeliveryTracker(){
  267. delete deliveryWrapper_;
  268. }
  269. SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const {
  270. return deliveryWrapper_->isDelivered();
  271. }
  272. void WatcherDeliveryTracker::resetDeliveryCounter(){
  273. deliveryWrapper_->resetDeliveryCounter();
  274. }
  275. SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const {
  276. return deliveryWrapper_->deliveryCounterEquals(expected);
  277. }
  278. //******************************************************************************
  279. //
  280. string HandshakeResponse::toString() const {
  281. string buf;
  282. int32_t tmp=htonl(protocolVersion);
  283. buf.append((char*)&tmp,sizeof(tmp));
  284. tmp=htonl(timeOut);
  285. buf.append((char*)&tmp,sizeof(tmp));
  286. int64_t tmp64=zoo_htonll(sessionId);
  287. buf.append((char*)&tmp64,sizeof(sessionId));
  288. tmp=htonl(passwd_len);
  289. buf.append((char*)&tmp,sizeof(tmp));
  290. buf.append(passwd,sizeof(passwd));
  291. if (!omitReadOnly) {
  292. buf.append(&readOnly,sizeof(readOnly));
  293. }
  294. // finally set the buffer length
  295. tmp=htonl(buf.size());
  296. buf.insert(0,(char*)&tmp, sizeof(tmp));
  297. return buf;
  298. }
  299. string ZooGetResponse::toString() const{
  300. oarchive* oa=create_buffer_oarchive();
  301. ReplyHeader h = {xid_,1,ZOK};
  302. serialize_ReplyHeader(oa, "hdr", &h);
  303. GetDataResponse resp;
  304. char buf[1024];
  305. assert("GetDataResponse is too long"&&data_.size()<=sizeof(buf));
  306. resp.data.len=data_.size();
  307. resp.data.buff=buf;
  308. data_.copy(resp.data.buff, data_.size());
  309. resp.stat=stat_;
  310. serialize_GetDataResponse(oa, "reply", &resp);
  311. int32_t len=htonl(get_buffer_len(oa));
  312. string res((char*)&len,sizeof(len));
  313. res.append(get_buffer(oa),get_buffer_len(oa));
  314. close_buffer_oarchive(&oa,1);
  315. return res;
  316. }
  317. string ZooStatResponse::toString() const{
  318. oarchive* oa=create_buffer_oarchive();
  319. ReplyHeader h = {xid_,1,rc_};
  320. serialize_ReplyHeader(oa, "hdr", &h);
  321. SetDataResponse resp;
  322. resp.stat=stat_;
  323. serialize_SetDataResponse(oa, "reply", &resp);
  324. int32_t len=htonl(get_buffer_len(oa));
  325. string res((char*)&len,sizeof(len));
  326. res.append(get_buffer(oa),get_buffer_len(oa));
  327. close_buffer_oarchive(&oa,1);
  328. return res;
  329. }
  330. string ZooGetChildrenResponse::toString() const{
  331. oarchive* oa=create_buffer_oarchive();
  332. ReplyHeader h = {xid_,1,rc_};
  333. serialize_ReplyHeader(oa, "hdr", &h);
  334. GetChildrenResponse resp;
  335. // populate the string vector
  336. allocate_String_vector(&resp.children,strings_.size());
  337. for(int i=0;i<(int)strings_.size();++i)
  338. resp.children.data[i]=strdup(strings_[i].c_str());
  339. serialize_GetChildrenResponse(oa, "reply", &resp);
  340. deallocate_GetChildrenResponse(&resp);
  341. int32_t len=htonl(get_buffer_len(oa));
  342. string res((char*)&len,sizeof(len));
  343. res.append(get_buffer(oa),get_buffer_len(oa));
  344. close_buffer_oarchive(&oa,1);
  345. return res;
  346. }
  347. string ZNodeEvent::toString() const{
  348. oarchive* oa=create_buffer_oarchive();
  349. struct WatcherEvent evt = {type_,0,(char*)path_.c_str()};
  350. struct ReplyHeader h = {WATCHER_EVENT_XID,0,ZOK };
  351. serialize_ReplyHeader(oa, "hdr", &h);
  352. serialize_WatcherEvent(oa, "event", &evt);
  353. int32_t len=htonl(get_buffer_len(oa));
  354. string res((char*)&len,sizeof(len));
  355. res.append(get_buffer(oa),get_buffer_len(oa));
  356. close_buffer_oarchive(&oa,1);
  357. return res;
  358. }
  359. string PingResponse::toString() const{
  360. oarchive* oa=create_buffer_oarchive();
  361. ReplyHeader h = {PING_XID,1,ZOK};
  362. serialize_ReplyHeader(oa, "hdr", &h);
  363. int32_t len=htonl(get_buffer_len(oa));
  364. string res((char*)&len,sizeof(len));
  365. res.append(get_buffer(oa),get_buffer_len(oa));
  366. close_buffer_oarchive(&oa,1);
  367. return res;
  368. }
  369. //******************************************************************************
  370. // Zookeeper server simulator
  371. //
  372. bool ZookeeperServer::hasMoreRecv() const{
  373. return recvHasMore.get()!=0 || connectionLost;
  374. }
  375. ssize_t ZookeeperServer::callRecv(int s,void *buf,size_t len,int flags){
  376. if(connectionLost){
  377. recvReturnBuffer.erase();
  378. return 0;
  379. }
  380. // done transmitting the current buffer?
  381. if(recvReturnBuffer.size()==0){
  382. synchronized(recvQMx);
  383. if(recvQueue.empty()){
  384. recvErrno=EAGAIN;
  385. return Mock_socket::callRecv(s,buf,len,flags);
  386. }
  387. --recvHasMore;
  388. Element& el=recvQueue.front();
  389. if(el.first!=0){
  390. recvReturnBuffer=el.first->toString();
  391. delete el.first;
  392. }
  393. recvErrno=el.second;
  394. recvQueue.pop_front();
  395. }
  396. return Mock_socket::callRecv(s,buf,len,flags);
  397. }
  398. void ZookeeperServer::onMessageReceived(const RequestHeader& rh, iarchive* ia){
  399. // no-op by default
  400. }
  401. void ZookeeperServer::notifyBufferSent(const std::string& buffer){
  402. if(HandshakeRequest::isValid(buffer)){
  403. // could be a connect request
  404. unique_ptr<HandshakeRequest> req(HandshakeRequest::parse(buffer));
  405. if(req.get()!=0){
  406. // handle the handshake
  407. int64_t sessId=sessionExpired?req->sessionId+1:req->sessionId;
  408. sessionExpired=false;
  409. addRecvResponse(new HandshakeResponse(sessId));
  410. return;
  411. }
  412. // not a connect request -- fall thru
  413. }
  414. // parse the buffer to extract the request type and its xid
  415. iarchive *ia=create_buffer_iarchive((char*)buffer.data(), buffer.size());
  416. RequestHeader rh;
  417. deserialize_RequestHeader(ia,"hdr",&rh);
  418. // notify the "server" a client request has arrived
  419. if (rh.xid == -8) {
  420. Element e = Element(new ZooStatResponse,0);
  421. e.first->setXID(-8);
  422. addRecvResponse(e);
  423. close_buffer_iarchive(&ia);
  424. return;
  425. } else {
  426. onMessageReceived(rh,ia);
  427. }
  428. close_buffer_iarchive(&ia);
  429. if(rh.type==ZOO_CLOSE_OP){
  430. ++closeSent;
  431. return; // no reply for close requests
  432. }
  433. // get the next response from the response queue and append it to the
  434. // receive list
  435. Element e;
  436. {
  437. synchronized(respQMx);
  438. if(respQueue.empty())
  439. return;
  440. e=respQueue.front();
  441. respQueue.pop_front();
  442. }
  443. e.first->setXID(rh.xid);
  444. addRecvResponse(e);
  445. }
  446. void forceConnected(zhandle_t* zh, const struct timeval *last_recv_send){
  447. // simulate connected state
  448. zh->state=ZOO_CONNECTED_STATE;
  449. // Simulate we're connected to the first host in our host list
  450. zh->fd->sock=ZookeeperServer::FD;
  451. assert(zh->addrs.count > 0);
  452. zh->addr_cur = zh->addrs.data[0];
  453. zh->addrs.next++;
  454. zh->input_buffer=0;
  455. if (last_recv_send) {
  456. zh->last_recv = *last_recv_send;
  457. zh->last_send = *last_recv_send;
  458. } else {
  459. gettimeofday(&zh->last_recv,0);
  460. gettimeofday(&zh->last_send,0);
  461. }
  462. }
  463. void terminateZookeeperThreads(zhandle_t* zh){
  464. // this will cause the zookeeper threads to terminate
  465. zh->close_requested=1;
  466. }