/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include // for htonl #include #include #include #ifdef THREADED #include "PthreadMocks.h" #endif #include "ZKMocks.h" using namespace std; TestClientId testClientId; const char* TestClientId::PASSWD="1234567890123456"; HandshakeRequest* HandshakeRequest::parse(const std::string& buf) { unique_ptr req(new HandshakeRequest); memcpy(&req->protocolVersion,buf.data(), sizeof(req->protocolVersion)); req->protocolVersion = htonl(req->protocolVersion); int offset=sizeof(req->protocolVersion); memcpy(&req->lastZxidSeen,buf.data()+offset,sizeof(req->lastZxidSeen)); req->lastZxidSeen = zoo_htonll(req->lastZxidSeen); offset+=sizeof(req->lastZxidSeen); memcpy(&req->timeOut,buf.data()+offset,sizeof(req->timeOut)); req->timeOut = htonl(req->timeOut); offset+=sizeof(req->timeOut); memcpy(&req->sessionId,buf.data()+offset,sizeof(req->sessionId)); req->sessionId = zoo_htonll(req->sessionId); offset+=sizeof(req->sessionId); memcpy(&req->passwd_len,buf.data()+offset,sizeof(req->passwd_len)); req->passwd_len = htonl(req->passwd_len); offset+=sizeof(req->passwd_len); memcpy(req->passwd,buf.data()+offset,sizeof(req->passwd)); offset+=sizeof(req->passwd); memcpy(&req->readOnly,buf.data()+offset,sizeof(req->readOnly)); if(testClientId.client_id==req->sessionId && !memcmp(testClientId.passwd,req->passwd,sizeof(req->passwd))) return req.release(); // the request didn't match -- may not be a handshake request after all return 0; } // ***************************************************************************** // watcher action implementation void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx) { if (zh == 0 || ctx == 0) return; WatcherAction* action = (WatcherAction *)ctx; if (type == ZOO_SESSION_EVENT) { if (state == ZOO_EXPIRED_SESSION_STATE) action->onSessionExpired(zh); else if(state == ZOO_CONNECTING_STATE) action->onConnectionLost(zh); else if(state == ZOO_CONNECTED_STATE) action->onConnectionEstablished(zh); } else if (type == ZOO_CHANGED_EVENT) action->onNodeValueChanged(zh,path); else if (type == ZOO_DELETED_EVENT) action->onNodeDeleted(zh,path); else if (type == ZOO_CHILD_EVENT) action->onChildChanged(zh,path); // TODO: implement for the rest of the event types action->setWatcherTriggered(); } SyncedBoolCondition WatcherAction::isWatcherTriggered() const { return SyncedBoolCondition(triggered_,mx_); } // a set of async completion signatures void asyncCompletion(int rc, ACL_vector *acl,Stat *stat, const void *data){ assert("Completion data is NULL"&&data); static_cast((void*)data)->aclCompl(rc,acl,stat); } void asyncCompletion(int rc, const char *value, int len, const Stat *stat, const void *data) { assert("Completion data is NULL"&&data); static_cast((void*)data)->dataCompl(rc,value,len,stat); } void asyncCompletion(int rc, const Stat *stat, const void *data) { assert("Completion data is NULL"&&data); static_cast((void*)data)->statCompl(rc,stat); } void asyncCompletion(int rc, const char *value, const void *data) { assert("Completion data is NULL"&&data); static_cast((void*)data)->stringCompl(rc,value); } void asyncCompletion(int rc,const String_vector *strings, const void *data) { assert("Completion data is NULL"&&data); static_cast((void*)data)->stringsCompl(rc,strings); } void asyncCompletion(int rc, const void *data) { assert("Completion data is NULL"&&data); static_cast((void*)data)->voidCompl(rc); } // a predicate implementation bool IOThreadStopped::operator()() const{ #ifdef THREADED adaptor_threads* adaptor=(adaptor_threads*)zh_->adaptor_priv; return CheckedPthread::isTerminated(adaptor->io); #else assert("IOThreadStopped predicate is only for use with THREADED client" && false); return false; #endif } //****************************************************************************** // DECLARE_WRAPPER(int,flush_send_queue,(zhandle_t*zh, int timeout)) { if(!Mock_flush_send_queue::mock_) return CALL_REAL(flush_send_queue,(zh,timeout)); return Mock_flush_send_queue::mock_->call(zh,timeout); } Mock_flush_send_queue* Mock_flush_send_queue::mock_=0; //****************************************************************************** // DECLARE_WRAPPER(int32_t,get_xid,()) { if(!Mock_get_xid::mock_) return CALL_REAL(get_xid,()); return Mock_get_xid::mock_->call(); } Mock_get_xid* Mock_get_xid::mock_=0; //****************************************************************************** // activateWatcher mock DECLARE_WRAPPER(void,activateWatcher,(zhandle_t *zh, watcher_registration_t* reg, int rc)) { if(!Mock_activateWatcher::mock_){ CALL_REAL(activateWatcher,(zh, reg,rc)); }else{ Mock_activateWatcher::mock_->call(zh, reg,rc); } } Mock_activateWatcher* Mock_activateWatcher::mock_=0; class ActivateWatcherWrapper: public Mock_activateWatcher{ public: ActivateWatcherWrapper():ctx_(0),activated_(false){} virtual void call(zhandle_t *zh, watcher_registration_t* reg, int rc){ CALL_REAL(activateWatcher,(zh, reg,rc)); synchronized(mx_); if(reg->context==ctx_){ activated_=true; ctx_=0; } } void setContext(void* ctx){ synchronized(mx_); ctx_=ctx; activated_=false; } SyncedBoolCondition isActivated() const{ return SyncedBoolCondition(activated_,mx_); } mutable Mutex mx_; void* ctx_; bool activated_; }; WatcherActivationTracker::WatcherActivationTracker(): wrapper_(new ActivateWatcherWrapper) { } WatcherActivationTracker::~WatcherActivationTracker(){ delete wrapper_; } void WatcherActivationTracker::track(void* ctx){ wrapper_->setContext(ctx); } SyncedBoolCondition WatcherActivationTracker::isWatcherActivated() const{ return wrapper_->isActivated(); } //****************************************************************************** // DECLARE_WRAPPER(void,deliverWatchers,(zhandle_t* zh,int type,int state, const char* path, watcher_object_list_t **list)) { if(!Mock_deliverWatchers::mock_){ CALL_REAL(deliverWatchers,(zh,type,state,path, list)); }else{ Mock_deliverWatchers::mock_->call(zh,type,state,path, list); } } Mock_deliverWatchers* Mock_deliverWatchers::mock_=0; struct RefCounterValue{ RefCounterValue(zhandle_t* const& zh,int32_t expectedCounter,Mutex& mx): zh_(zh),expectedCounter_(expectedCounter),mx_(mx){} bool operator()() const{ { synchronized(mx_); if(zh_==0) return false; } return inc_ref_counter(zh_,0)==expectedCounter_; } zhandle_t* const& zh_; int32_t expectedCounter_; Mutex& mx_; }; class DeliverWatchersWrapper: public Mock_deliverWatchers{ public: DeliverWatchersWrapper(int type,int state,bool terminate): type_(type),state_(state), allDelivered_(false),terminate_(terminate),zh_(0),deliveryCounter_(0){} virtual void call(zhandle_t* zh, int type, int state, const char* path, watcher_object_list **list) { { synchronized(mx_); zh_=zh; allDelivered_=false; } CALL_REAL(deliverWatchers,(zh,type,state,path, list)); if(type_==type && state_==state){ if(terminate_){ // prevent zhandle_t from being prematurely distroyed; // this will also ensure that zookeeper_close() cleanups the // thread resources by calling finish_adaptor() inc_ref_counter(zh,1); terminateZookeeperThreads(zh); } synchronized(mx_); allDelivered_=true; deliveryCounter_++; } } SyncedBoolCondition isDelivered() const{ if(terminate_){ int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000); assert(i<1000); } return SyncedBoolCondition(allDelivered_,mx_); } void resetDeliveryCounter(){ synchronized(mx_); deliveryCounter_=0; } SyncedIntegerEqual deliveryCounterEquals(int expected) const{ if(terminate_){ int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000); assert(i<1000); } return SyncedIntegerEqual(deliveryCounter_,expected,mx_); } int type_; int state_; mutable Mutex mx_; bool allDelivered_; bool terminate_; zhandle_t* zh_; int deliveryCounter_; }; WatcherDeliveryTracker::WatcherDeliveryTracker( int type,int state,bool terminateCompletionThread): deliveryWrapper_(new DeliverWatchersWrapper( type,state,terminateCompletionThread)){ } WatcherDeliveryTracker::~WatcherDeliveryTracker(){ delete deliveryWrapper_; } SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const { return deliveryWrapper_->isDelivered(); } void WatcherDeliveryTracker::resetDeliveryCounter(){ deliveryWrapper_->resetDeliveryCounter(); } SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const { return deliveryWrapper_->deliveryCounterEquals(expected); } //****************************************************************************** // string HandshakeResponse::toString() const { string buf; int32_t tmp=htonl(protocolVersion); buf.append((char*)&tmp,sizeof(tmp)); tmp=htonl(timeOut); buf.append((char*)&tmp,sizeof(tmp)); int64_t tmp64=zoo_htonll(sessionId); buf.append((char*)&tmp64,sizeof(sessionId)); tmp=htonl(passwd_len); buf.append((char*)&tmp,sizeof(tmp)); buf.append(passwd,sizeof(passwd)); if (!omitReadOnly) { buf.append(&readOnly,sizeof(readOnly)); } // finally set the buffer length tmp=htonl(buf.size()); buf.insert(0,(char*)&tmp, sizeof(tmp)); return buf; } string ZooGetResponse::toString() const{ oarchive* oa=create_buffer_oarchive(); ReplyHeader h = {xid_,1,ZOK}; serialize_ReplyHeader(oa, "hdr", &h); GetDataResponse resp; char buf[1024]; assert("GetDataResponse is too long"&&data_.size()<=sizeof(buf)); resp.data.len=data_.size(); resp.data.buff=buf; data_.copy(resp.data.buff, data_.size()); resp.stat=stat_; serialize_GetDataResponse(oa, "reply", &resp); int32_t len=htonl(get_buffer_len(oa)); string res((char*)&len,sizeof(len)); res.append(get_buffer(oa),get_buffer_len(oa)); close_buffer_oarchive(&oa,1); return res; } string ZooStatResponse::toString() const{ oarchive* oa=create_buffer_oarchive(); ReplyHeader h = {xid_,1,rc_}; serialize_ReplyHeader(oa, "hdr", &h); SetDataResponse resp; resp.stat=stat_; serialize_SetDataResponse(oa, "reply", &resp); int32_t len=htonl(get_buffer_len(oa)); string res((char*)&len,sizeof(len)); res.append(get_buffer(oa),get_buffer_len(oa)); close_buffer_oarchive(&oa,1); return res; } string ZooGetChildrenResponse::toString() const{ oarchive* oa=create_buffer_oarchive(); ReplyHeader h = {xid_,1,rc_}; serialize_ReplyHeader(oa, "hdr", &h); GetChildrenResponse resp; // populate the string vector allocate_String_vector(&resp.children,strings_.size()); for(int i=0;i<(int)strings_.size();++i) resp.children.data[i]=strdup(strings_[i].c_str()); serialize_GetChildrenResponse(oa, "reply", &resp); deallocate_GetChildrenResponse(&resp); int32_t len=htonl(get_buffer_len(oa)); string res((char*)&len,sizeof(len)); res.append(get_buffer(oa),get_buffer_len(oa)); close_buffer_oarchive(&oa,1); return res; } string ZNodeEvent::toString() const{ oarchive* oa=create_buffer_oarchive(); struct WatcherEvent evt = {type_,0,(char*)path_.c_str()}; struct ReplyHeader h = {WATCHER_EVENT_XID,0,ZOK }; serialize_ReplyHeader(oa, "hdr", &h); serialize_WatcherEvent(oa, "event", &evt); int32_t len=htonl(get_buffer_len(oa)); string res((char*)&len,sizeof(len)); res.append(get_buffer(oa),get_buffer_len(oa)); close_buffer_oarchive(&oa,1); return res; } string PingResponse::toString() const{ oarchive* oa=create_buffer_oarchive(); ReplyHeader h = {PING_XID,1,ZOK}; serialize_ReplyHeader(oa, "hdr", &h); int32_t len=htonl(get_buffer_len(oa)); string res((char*)&len,sizeof(len)); res.append(get_buffer(oa),get_buffer_len(oa)); close_buffer_oarchive(&oa,1); return res; } //****************************************************************************** // Zookeeper server simulator // bool ZookeeperServer::hasMoreRecv() const{ return recvHasMore.get()!=0 || connectionLost; } ssize_t ZookeeperServer::callRecv(int s,void *buf,size_t len,int flags){ if(connectionLost){ recvReturnBuffer.erase(); return 0; } // done transmitting the current buffer? if(recvReturnBuffer.size()==0){ synchronized(recvQMx); if(recvQueue.empty()){ recvErrno=EAGAIN; return Mock_socket::callRecv(s,buf,len,flags); } --recvHasMore; Element& el=recvQueue.front(); if(el.first!=0){ recvReturnBuffer=el.first->toString(); delete el.first; } recvErrno=el.second; recvQueue.pop_front(); } return Mock_socket::callRecv(s,buf,len,flags); } void ZookeeperServer::onMessageReceived(const RequestHeader& rh, iarchive* ia){ // no-op by default } void ZookeeperServer::notifyBufferSent(const std::string& buffer){ if(HandshakeRequest::isValid(buffer)){ // could be a connect request unique_ptr req(HandshakeRequest::parse(buffer)); if(req.get()!=0){ // handle the handshake int64_t sessId=sessionExpired?req->sessionId+1:req->sessionId; sessionExpired=false; addRecvResponse(new HandshakeResponse(sessId)); return; } // not a connect request -- fall thru } // parse the buffer to extract the request type and its xid iarchive *ia=create_buffer_iarchive((char*)buffer.data(), buffer.size()); RequestHeader rh; deserialize_RequestHeader(ia,"hdr",&rh); // notify the "server" a client request has arrived if (rh.xid == -8) { Element e = Element(new ZooStatResponse,0); e.first->setXID(-8); addRecvResponse(e); close_buffer_iarchive(&ia); return; } else { onMessageReceived(rh,ia); } close_buffer_iarchive(&ia); if(rh.type==ZOO_CLOSE_OP){ ++closeSent; return; // no reply for close requests } // get the next response from the response queue and append it to the // receive list Element e; { synchronized(respQMx); if(respQueue.empty()) return; e=respQueue.front(); respQueue.pop_front(); } e.first->setXID(rh.xid); addRecvResponse(e); } void forceConnected(zhandle_t* zh, const struct timeval *last_recv_send){ // simulate connected state zh->state=ZOO_CONNECTED_STATE; // Simulate we're connected to the first host in our host list zh->fd->sock=ZookeeperServer::FD; assert(zh->addrs.count > 0); zh->addr_cur = zh->addrs.data[0]; zh->addrs.next++; zh->input_buffer=0; if (last_recv_send) { zh->last_recv = *last_recv_send; zh->last_send = *last_recv_send; } else { gettimeofday(&zh->last_recv,0); gettimeofday(&zh->last_send,0); } } void terminateZookeeperThreads(zhandle_t* zh){ // this will cause the zookeeper threads to terminate zh->close_requested=1; }