blockingqueue.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #ifndef __BLOCKINGQUEUE_H__
  19. #define __BLOCKINGQUEUE_H__
  20. #include <deque>
  21. #include "mutex.h"
  22. using namespace std;
  23. USING_ZKFUSE_NAMESPACE
  24. namespace zk {
  25. /**
  26. * \brief An unbounded blocking queue of elements of type E.
  27. *
  28. * <p>
  29. * This class is thread safe.
  30. */
  31. template <class E>
  32. class BlockingQueue {
  33. public:
  34. /**
  35. * \brief Adds the specified element to this queue, waiting if necessary
  36. * \brief for space to become available.
  37. *
  38. * @param e the element to be added
  39. */
  40. void put(E e);
  41. /**
  42. * \brief Retrieves and removes the head of this queue, waiting if
  43. * \brief no elements are present in this queue.
  44. *
  45. * @param timeout how long to wait until an element becomes availabe,
  46. * in milliseconds; if <code>0</code> then wait forever
  47. * @param timedOut if not NULL then set to true whether this function timed out
  48. * @return the element from the queue
  49. */
  50. E take(int32_t timeout = 0, bool *timedOut = NULL);
  51. /**
  52. * Returns the current size of this blocking queue.
  53. *
  54. * @return the number of elements in this queue
  55. */
  56. int size() const;
  57. /**
  58. * \brief Returns whether this queue is empty or not.
  59. *
  60. * @return true if this queue has no elements; false otherwise
  61. */
  62. bool empty() const;
  63. private:
  64. /**
  65. * The queue of elements. Deque is used to provide O(1) time
  66. * for head elements removal.
  67. */
  68. deque<E> m_queue;
  69. /**
  70. * The mutex used for queue synchronization.
  71. */
  72. mutable zkfuse::Mutex m_mutex;
  73. /**
  74. * The conditionial variable associated with the mutex above.
  75. */
  76. mutable Cond m_cond;
  77. };
  78. template<class E>
  79. int BlockingQueue<E>::size() const {
  80. int size;
  81. m_mutex.Acquire();
  82. size = m_queue.size();
  83. m_mutex.Release();
  84. return size;
  85. }
  86. template<class E>
  87. bool BlockingQueue<E>::empty() const {
  88. bool isEmpty;
  89. m_mutex.Acquire();
  90. isEmpty = m_queue.empty();
  91. m_mutex.Release();
  92. return isEmpty;
  93. }
  94. template<class E>
  95. void BlockingQueue<E>::put(E e) {
  96. m_mutex.Acquire();
  97. m_queue.push_back( e );
  98. m_cond.Signal();
  99. m_mutex.Release();
  100. }
  101. template<class E>
  102. E BlockingQueue<E>::take(int32_t timeout, bool *timedOut) {
  103. m_mutex.Acquire();
  104. bool hasResult = true;
  105. while (m_queue.empty()) {
  106. if (timeout <= 0) {
  107. m_cond.Wait( m_mutex );
  108. } else {
  109. if (!m_cond.Wait( m_mutex, timeout )) {
  110. hasResult = false;
  111. break;
  112. }
  113. }
  114. }
  115. if (hasResult) {
  116. E e = m_queue.front();
  117. m_queue.pop_front();
  118. m_mutex.Release();
  119. if (timedOut) {
  120. *timedOut = false;
  121. }
  122. return e;
  123. } else {
  124. m_mutex.Release();
  125. if (timedOut) {
  126. *timedOut = true;
  127. }
  128. return E();
  129. }
  130. }
  131. }
  132. #endif /* __BLOCKINGQUEUE_H__ */