ThreadingUtil.h 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef THREADINGUTIL_H_
  17. #define THREADINGUTIL_H_
  18. #include <vector>
  19. #ifdef THREADED
  20. #include "pthread.h"
  21. #endif
  22. // *****************************************************************************
  23. // Threading primitives
  24. // atomic post-increment; returns the previous value of the operand
  25. int32_t atomic_post_incr(volatile int32_t* operand, int32_t incr);
  26. // atomic fetch&store; returns the previous value of the operand
  27. int32_t atomic_fetch_store(volatile int32_t *operand, int32_t value);
  28. // a partial implementation of an atomic integer type
  29. class AtomicInt{
  30. public:
  31. explicit AtomicInt(int32_t init=0):v_(init){}
  32. AtomicInt(const AtomicInt& other):v_(other){}
  33. // assigment
  34. AtomicInt& operator=(const AtomicInt& lhs){
  35. atomic_fetch_store(&v_,lhs);
  36. return *this;
  37. }
  38. AtomicInt& operator=(int32_t i){
  39. atomic_fetch_store(&v_,i);
  40. return *this;
  41. }
  42. // pre-increment
  43. AtomicInt& operator++() {
  44. atomic_post_incr(&v_,1);
  45. return *this;
  46. }
  47. // pre-decrement
  48. AtomicInt& operator--() {
  49. atomic_post_incr(&v_,-1);
  50. return *this;
  51. }
  52. // post-increment
  53. AtomicInt operator++(int){
  54. return AtomicInt(atomic_post_incr(&v_,1));
  55. }
  56. // post-decrement
  57. AtomicInt operator--(int){
  58. return AtomicInt(atomic_post_incr(&v_,-1));
  59. }
  60. operator int() const{
  61. return atomic_post_incr(&v_,0);
  62. }
  63. int get() const{
  64. return atomic_post_incr(&v_,0);
  65. }
  66. private:
  67. mutable int32_t v_;
  68. };
  69. #ifdef THREADED
  70. // ****************************************************************************
  71. #define VALIDATE_JOBS(jm) jm.validateJobs(__FILE__,__LINE__)
  72. #define VALIDATE_JOB(j) j.validate(__FILE__,__LINE__)
  73. class Mutex{
  74. public:
  75. Mutex();
  76. ~Mutex();
  77. void acquire();
  78. void release();
  79. private:
  80. Mutex(const Mutex&);
  81. Mutex& operator=(const Mutex&);
  82. struct Impl;
  83. Impl* impl_;
  84. };
  85. class MTLock{
  86. public:
  87. MTLock(Mutex& m):m_(m){m.acquire();}
  88. ~MTLock(){m_.release();}
  89. Mutex& m_;
  90. };
  91. #define synchronized(m) MTLock __lock(m)
  92. // ****************************************************************************
  93. class Latch {
  94. public:
  95. virtual ~Latch() {}
  96. virtual void await() const =0;
  97. virtual void signalAndWait() =0;
  98. virtual void signal() =0;
  99. };
  100. class CountDownLatch: public Latch {
  101. public:
  102. CountDownLatch(int count):count_(count) {
  103. pthread_cond_init(&cond_,0);
  104. pthread_mutex_init(&mut_,0);
  105. }
  106. virtual ~CountDownLatch() {
  107. pthread_mutex_lock(&mut_);
  108. if(count_!=0) {
  109. count_=0;
  110. pthread_cond_broadcast(&cond_);
  111. }
  112. pthread_mutex_unlock(&mut_);
  113. pthread_cond_destroy(&cond_);
  114. pthread_mutex_destroy(&mut_);
  115. }
  116. virtual void await() const {
  117. pthread_mutex_lock(&mut_);
  118. awaitImpl();
  119. pthread_mutex_unlock(&mut_);
  120. }
  121. virtual void signalAndWait() {
  122. pthread_mutex_lock(&mut_);
  123. signalImpl();
  124. awaitImpl();
  125. pthread_mutex_unlock(&mut_);
  126. }
  127. virtual void signal() {
  128. pthread_mutex_lock(&mut_);
  129. signalImpl();
  130. pthread_mutex_unlock(&mut_);
  131. }
  132. private:
  133. void awaitImpl() const{
  134. while(count_!=0)
  135. pthread_cond_wait(&cond_,&mut_);
  136. }
  137. void signalImpl() {
  138. if(count_>0) {
  139. count_--;
  140. pthread_cond_broadcast(&cond_);
  141. }
  142. }
  143. int count_;
  144. mutable pthread_mutex_t mut_;
  145. mutable pthread_cond_t cond_;
  146. };
  147. class TestJob {
  148. public:
  149. typedef long JobId;
  150. TestJob():hasRun_(false),startLatch_(0),endLatch_(0) {}
  151. virtual ~TestJob() {
  152. join();
  153. }
  154. virtual TestJob* clone() const =0;
  155. virtual void run() =0;
  156. virtual void validate(const char* file, int line) const =0;
  157. virtual void start(Latch* startLatch=0,Latch* endLatch=0) {
  158. startLatch_=startLatch;endLatch_=endLatch;
  159. hasRun_=true;
  160. pthread_create(&thread_, 0, thread, this);
  161. }
  162. virtual JobId getJobId() const {
  163. return (JobId)thread_;
  164. }
  165. virtual void join() {
  166. if(!hasRun_)
  167. return;
  168. if(!pthread_equal(thread_,pthread_self()))
  169. pthread_join(thread_,0);
  170. else
  171. pthread_detach(thread_);
  172. }
  173. private:
  174. void awaitStart() {
  175. if(startLatch_==0) return;
  176. startLatch_->signalAndWait();
  177. }
  178. void signalFinished() {
  179. if(endLatch_==0) return;
  180. endLatch_->signal();
  181. }
  182. static void* thread(void* p) {
  183. TestJob* j=(TestJob*)p;
  184. j->awaitStart(); // wait for the start command
  185. j->run();
  186. j->signalFinished();
  187. return 0;
  188. }
  189. bool hasRun_;
  190. Latch* startLatch_;
  191. Latch* endLatch_;
  192. pthread_t thread_;
  193. };
  194. class TestJobManager {
  195. typedef std::vector<TestJob*> JobList;
  196. public:
  197. TestJobManager(const TestJob& tj,int threadCount=1):
  198. startLatch_(threadCount),endLatch_(threadCount)
  199. {
  200. for(int i=0;i<threadCount;++i)
  201. jobs_.push_back(tj.clone());
  202. }
  203. virtual ~TestJobManager(){
  204. for(unsigned i=0;i<jobs_.size();++i)
  205. delete jobs_[i];
  206. }
  207. virtual void startAllJobs() {
  208. for(unsigned i=0;i<jobs_.size();++i)
  209. jobs_[i]->start(&startLatch_,&endLatch_);
  210. }
  211. virtual void startJobsImmediately() {
  212. for(unsigned i=0;i<jobs_.size();++i)
  213. jobs_[i]->start(0,&endLatch_);
  214. }
  215. virtual void wait() const {
  216. endLatch_.await();
  217. }
  218. virtual void validateJobs(const char* file, int line) const{
  219. for(unsigned i=0;i<jobs_.size();++i)
  220. jobs_[i]->validate(file,line);
  221. }
  222. private:
  223. JobList jobs_;
  224. CountDownLatch startLatch_;
  225. CountDownLatch endLatch_;
  226. };
  227. #else // THREADED
  228. // single THREADED
  229. class Mutex{
  230. public:
  231. void acquire(){}
  232. void release(){}
  233. };
  234. #define synchronized(m)
  235. #endif // THREADED
  236. #endif /*THREADINGUTIL_H_*/