ZKMocks.cc 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <arpa/inet.h> // for htonl
  17. #include <memory>
  18. #include <zookeeper.h>
  19. #include <proto.h>
  20. #ifdef THREADED
  21. #include "PthreadMocks.h"
  22. #endif
  23. #include "ZKMocks.h"
  24. using namespace std;
  25. TestClientId testClientId;
  26. const char* TestClientId::PASSWD="1234567890123456";
  27. HandshakeRequest* HandshakeRequest::parse(const std::string& buf){
  28. auto_ptr<HandshakeRequest> req(new HandshakeRequest);
  29. memcpy(&req->protocolVersion,buf.data(), sizeof(req->protocolVersion));
  30. req->protocolVersion = htonl(req->protocolVersion);
  31. int offset=sizeof(req->protocolVersion);
  32. memcpy(&req->lastZxidSeen,buf.data()+offset,sizeof(req->lastZxidSeen));
  33. req->lastZxidSeen = htonll(req->lastZxidSeen);
  34. offset+=sizeof(req->lastZxidSeen);
  35. memcpy(&req->timeOut,buf.data()+offset,sizeof(req->timeOut));
  36. req->timeOut = htonl(req->timeOut);
  37. offset+=sizeof(req->timeOut);
  38. memcpy(&req->sessionId,buf.data()+offset,sizeof(req->sessionId));
  39. req->sessionId = htonll(req->sessionId);
  40. offset+=sizeof(req->sessionId);
  41. memcpy(&req->passwd_len,buf.data()+offset,sizeof(req->passwd_len));
  42. req->passwd_len = htonl(req->passwd_len);
  43. offset+=sizeof(req->passwd_len);
  44. memcpy(req->passwd,buf.data()+offset,sizeof(req->passwd));
  45. if(testClientId.client_id==req->sessionId &&
  46. !memcmp(testClientId.passwd,req->passwd,sizeof(req->passwd)))
  47. return req.release();
  48. // the request didn't match -- may not be a handshake request after all
  49. return 0;
  50. }
  51. // *****************************************************************************
  52. // watcher action implementation
  53. void activeWatcher(zhandle_t *zh, int type, int state, const char *path){
  54. if(zh==0 || zoo_get_context(zh)==0) return;
  55. WatcherAction* action=(WatcherAction*)zoo_get_context(zh);
  56. action->setWatcherTriggered();
  57. if(type==SESSION_EVENT && state==EXPIRED_SESSION_STATE)
  58. action->onSessionExpired(zh);
  59. if(type==CHANGED_EVENT)
  60. action->onNodeValueChanged(zh,path);
  61. // TODO: implement for the rest of the event types
  62. // ...
  63. }
  64. SyncedBoolCondition WatcherAction::isWatcherTriggered() const{
  65. return SyncedBoolCondition(triggered_,mx_);
  66. }
  67. // *****************************************************************************
  68. // a set of async completion signatures
  69. void asyncCompletion(int rc, ACL_vector *acl,Stat *stat, const void *data){
  70. assert("Completion data is NULL"&&data);
  71. static_cast<AsyncCompletion*>((void*)data)->aclCompl(rc,acl,stat);
  72. }
  73. void asyncCompletion(int rc, const char *value, int len, const Stat *stat,
  74. const void *data){
  75. assert("Completion data is NULL"&&data);
  76. static_cast<AsyncCompletion*>((void*)data)->dataCompl(rc,value,len,stat);
  77. }
  78. void asyncCompletion(int rc, const Stat *stat, const void *data){
  79. assert("Completion data is NULL"&&data);
  80. static_cast<AsyncCompletion*>((void*)data)->statCompl(rc,stat);
  81. }
  82. void asyncCompletion(int rc, const char *value, const void *data){
  83. assert("Completion data is NULL"&&data);
  84. static_cast<AsyncCompletion*>((void*)data)->stringCompl(rc,value);
  85. }
  86. void asyncCompletion(int rc,const String_vector *strings, const void *data){
  87. assert("Completion data is NULL"&&data);
  88. static_cast<AsyncCompletion*>((void*)data)->stringsCompl(rc,strings);
  89. }
  90. void asyncCompletion(int rc, const void *data){
  91. assert("Completion data is NULL"&&data);
  92. static_cast<AsyncCompletion*>((void*)data)->voidCompl(rc);
  93. }
  94. // *****************************************************************************
  95. // a predicate implementation
  96. bool IOThreadStopped::operator()() const{
  97. #ifdef THREADED
  98. adaptor_threads* adaptor=(adaptor_threads*)zh_->adaptor_priv;
  99. return CheckedPthread::isTerminated(adaptor->io);
  100. #else
  101. assert("IOThreadStopped predicate is only for use with THREADED client"&& false);
  102. return false;
  103. #endif
  104. }
  105. //******************************************************************************
  106. //
  107. DECLARE_WRAPPER(int,flush_send_queue,(zhandle_t*zh, int timeout))
  108. {
  109. if(!Mock_flush_send_queue::mock_)
  110. return CALL_REAL(flush_send_queue,(zh,timeout));
  111. return Mock_flush_send_queue::mock_->call(zh,timeout);
  112. }
  113. Mock_flush_send_queue* Mock_flush_send_queue::mock_=0;
  114. //******************************************************************************
  115. //
  116. DECLARE_WRAPPER(int32_t,get_xid,())
  117. {
  118. if(!Mock_get_xid::mock_)
  119. return CALL_REAL(get_xid,());
  120. return Mock_get_xid::mock_->call();
  121. }
  122. Mock_get_xid* Mock_get_xid::mock_=0;
  123. //******************************************************************************
  124. //
  125. string HandshakeResponse::toString() const {
  126. string buf;
  127. int32_t tmp=htonl(protocolVersion);
  128. buf.append((char*)&tmp,sizeof(tmp));
  129. tmp=htonl(timeOut);
  130. buf.append((char*)&tmp,sizeof(tmp));
  131. int64_t tmp64=htonll(sessionId);
  132. buf.append((char*)&tmp64,sizeof(sessionId));
  133. tmp=htonl(passwd_len);
  134. buf.append((char*)&tmp,sizeof(tmp));
  135. buf.append(passwd,sizeof(passwd));
  136. // finally set the buffer length
  137. tmp=htonl(buf.size()+sizeof(tmp));
  138. buf.insert(0,(char*)&tmp, sizeof(tmp));
  139. return buf;
  140. }
  141. string ZooGetResponse::toString() const{
  142. oarchive* oa=create_buffer_oarchive();
  143. ReplyHeader h = {xid_,1,ZOK};
  144. serialize_ReplyHeader(oa, "hdr", &h);
  145. GetDataResponse resp;
  146. char buf[1024];
  147. assert("GetDataResponse is too long"&&data_.size()<=sizeof(buf));
  148. resp.data.len=data_.size();
  149. resp.data.buff=buf;
  150. data_.copy(resp.data.buff, data_.size());
  151. resp.stat=stat_;
  152. serialize_GetDataResponse(oa, "reply", &resp);
  153. int32_t len=htonl(get_buffer_len(oa));
  154. string res((char*)&len,sizeof(len));
  155. res.append(get_buffer(oa),get_buffer_len(oa));
  156. close_buffer_oarchive(&oa,1);
  157. return res;
  158. }
  159. string ZNodeEvent::toString() const{
  160. oarchive* oa=create_buffer_oarchive();
  161. struct WatcherEvent evt = {type_,0,(char*)path_.c_str()};
  162. struct ReplyHeader h = {WATCHER_EVENT_XID,0,ZOK };
  163. serialize_ReplyHeader(oa, "hdr", &h);
  164. serialize_WatcherEvent(oa, "event", &evt);
  165. int32_t len=htonl(get_buffer_len(oa));
  166. string res((char*)&len,sizeof(len));
  167. res.append(get_buffer(oa),get_buffer_len(oa));
  168. close_buffer_oarchive(&oa,1);
  169. return res;
  170. }
  171. string PingResponse::toString() const{
  172. oarchive* oa=create_buffer_oarchive();
  173. ReplyHeader h = {PING_XID,1,ZOK};
  174. serialize_ReplyHeader(oa, "hdr", &h);
  175. int32_t len=htonl(get_buffer_len(oa));
  176. string res((char*)&len,sizeof(len));
  177. res.append(get_buffer(oa),get_buffer_len(oa));
  178. close_buffer_oarchive(&oa,1);
  179. return res;
  180. }
  181. //******************************************************************************
  182. // Zookeeper server simulator
  183. //
  184. bool ZookeeperServer::hasMoreRecv() const{
  185. return recvHasMore.get()!=0;
  186. }
  187. ssize_t ZookeeperServer::callRecv(int s,void *buf,size_t len,int flags){
  188. if(connectionLost){
  189. recvReturnBuffer.erase();
  190. return 0;
  191. }
  192. // done transmitting the current buffer?
  193. if(recvReturnBuffer.size()==0){
  194. synchronized(recvQMx);
  195. if(recvQueue.empty()){
  196. recvErrno=EAGAIN;
  197. return Mock_socket::callRecv(s,buf,len,flags);
  198. }
  199. --recvHasMore;
  200. Element& el=recvQueue.front();
  201. if(el.first!=0){
  202. recvReturnBuffer=el.first->toString();
  203. delete el.first;
  204. }
  205. recvErrno=el.second;
  206. recvQueue.pop_front();
  207. }
  208. return Mock_socket::callRecv(s,buf,len,flags);
  209. }
  210. void ZookeeperServer::onMessageReceived(const RequestHeader& rh, iarchive* ia){
  211. // no-op by default
  212. }
  213. void ZookeeperServer::notifyBufferSent(const std::string& buffer){
  214. if(HandshakeRequest::isValid(buffer)){
  215. // could be a connect request
  216. auto_ptr<HandshakeRequest> req(HandshakeRequest::parse(buffer));
  217. if(req.get()!=0){
  218. // handle the handshake
  219. int64_t sessId=sessionExpired?req->sessionId+1:req->sessionId;
  220. sessionExpired=false;
  221. addRecvResponse(new HandshakeResponse(sessId));
  222. return;
  223. }
  224. // not a connect request -- fall thru
  225. }
  226. // parse the buffer to extract the request type and its xid
  227. iarchive *ia=create_buffer_iarchive((char*)buffer.data(), buffer.size());
  228. RequestHeader rh;
  229. deserialize_RequestHeader(ia,"hdr",&rh);
  230. // notify the "server" a client request has arrived
  231. onMessageReceived(rh,ia);
  232. close_buffer_iarchive(&ia);
  233. if(rh.type==CLOSE_OP){
  234. ++closeSent;
  235. return; // no reply for close requests
  236. }
  237. // get the next response from the response queue and append it to the receive list
  238. Element e;
  239. {
  240. synchronized(respQMx);
  241. if(respQueue.empty())
  242. return;
  243. e=respQueue.front();
  244. respQueue.pop_front();
  245. }
  246. e.first->setXID(rh.xid);
  247. addRecvResponse(e);
  248. }
  249. void forceConnected(zhandle_t* zh){
  250. // simulate connected state
  251. zh->state=CONNECTED_STATE;
  252. zh->fd=ZookeeperServer::FD;
  253. zh->input_buffer=0;
  254. gettimeofday(&zh->last_recv,0);
  255. gettimeofday(&zh->last_send,0);
  256. }