/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef WATCH_UTIL_H_ #define WATCH_UTIL_H_ #include #include #include using namespace std; #include "CollectionUtil.h" #include "ThreadingUtil.h" using namespace Util; #ifdef THREADED static void yield(zhandle_t *zh, int i) { sleep(i); } #else static void yield(zhandle_t *zh, int seconds) { int fd; int interest; int events; struct timeval tv; time_t expires = time(0) + seconds; time_t timeLeft = seconds; fd_set rfds, wfds, efds; FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); while(timeLeft >= 0) { zookeeper_interest(zh, &fd, &interest, &tv); if (fd != -1) { if (interest&ZOOKEEPER_READ) { FD_SET(fd, &rfds); } else { FD_CLR(fd, &rfds); } if (interest&ZOOKEEPER_WRITE) { FD_SET(fd, &wfds); } else { FD_CLR(fd, &wfds); } } else { fd = 0; } FD_SET(0, &rfds); if (tv.tv_sec > timeLeft) { tv.tv_sec = timeLeft; } select(fd+1, &rfds, &wfds, &efds, &tv); timeLeft = expires - time(0); events = 0; if (FD_ISSET(fd, &rfds)) { events |= ZOOKEEPER_READ; } if (FD_ISSET(fd, &wfds)) { events |= ZOOKEEPER_WRITE; } zookeeper_process(zh, events); } } #endif typedef struct evt { string path; int type; } evt_t; typedef struct watchCtx { private: list events; watchCtx(const watchCtx&); watchCtx& operator=(const watchCtx&); public: bool connected; zhandle_t *zh; Mutex mutex; watchCtx() { connected = false; zh = 0; } ~watchCtx() { if (zh) { zookeeper_close(zh); zh = 0; } } evt_t getEvent() { evt_t evt; mutex.acquire(); CPPUNIT_ASSERT( events.size() > 0); evt = events.front(); events.pop_front(); mutex.release(); return evt; } int countEvents() { int count; mutex.acquire(); count = events.size(); mutex.release(); return count; } void putEvent(evt_t evt) { mutex.acquire(); events.push_back(evt); mutex.release(); } bool waitForConnected(zhandle_t *zh) { time_t expires = time(0) + 10; while(!connected && time(0) < expires) { yield(zh, 1); } return connected; } bool waitForDisconnected(zhandle_t *zh) { time_t expires = time(0) + 15; while(connected && time(0) < expires) { yield(zh, 1); } return !connected; } } watchctx_t; #endif /*WATCH_UTIL_H_*/