فهرست منبع

ZOOKEEPER-3340: Introduce CircularBlockingQueue in QuorumCnxManager.java

There are many caveats and comments regarding Exception thrown 'which is safe to ignore'.  Added a new data structure that removes all of these comments and will perform without generating exceptions while more clearly implementing the desired behavior without the caveats.

Author: Beluga Behr <dam6923@gmail.com>
Author: David Mollitor <dam6923@gmail.com>
Author: David Mollitor <dmollitor@apache.org>

Reviewers: eolivelli@apache.org, andor@apache.org

Closes #880 from belugabehr/ZOOKEEPER-3340 and squashes the following commits:

7b74dac27 [David Mollitor] Changed comment from conrete ArrayBlockingQueue to BlockingQueue
38d7e3f05 [David Mollitor] Shutdown ExecutorService in test
f5ea2b6b1 [David Mollitor] Updated to fix checkstyle ImportOrder: Extra separation
b3ac3cc44 [David Mollitor] Rebased to master
c63ab852c [Beluga Behr] Added updates from code review
d8877803b [Beluga Behr] Introduce new class CircularBlockingQueue
2793db2f1 [Beluga Behr] Added additional logging
ce96b7071 [Beluga Behr] ZOOKEEPER-3340: Improve Queue Usage in QuorumCnxManager.java
Beluga Behr 5 سال پیش
والد
کامیت
cd46594794

+ 46 - 90
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -36,9 +36,8 @@ import java.util.Collections;
 import java.util.Enumeration;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadFactory;
@@ -55,6 +54,7 @@ import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
 import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.ConfigUtils;
 import org.apache.zookeeper.server.util.ConfigUtils;
+import org.apache.zookeeper.util.CircularBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -137,17 +137,13 @@ public class QuorumCnxManager {
      * Mapping from Peer to Thread number
      * Mapping from Peer to Thread number
      */
      */
     final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
     final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
-    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
+    final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
     final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
     final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
 
 
     /*
     /*
      * Reception queue
      * Reception queue
      */
      */
-    public final ArrayBlockingQueue<Message> recvQueue;
-    /*
-     * Object to synchronize access to recvQueue
-     */
-    private final Object recvQLock = new Object();
+    public final BlockingQueue<Message> recvQueue;
 
 
     /*
     /*
      * Shutdown flag
      * Shutdown flag
@@ -253,10 +249,10 @@ public class QuorumCnxManager {
     }
     }
 
 
     public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
     public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
-        this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
-        this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
-        this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
-        this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
+        this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
+        this.queueSendMap = new ConcurrentHashMap<>();
+        this.senderWorkerMap = new ConcurrentHashMap<>();
+        this.lastMessageSent = new ConcurrentHashMap<>();
 
 
         String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
         String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
         if (cnxToValue != null) {
         if (cnxToValue != null) {
@@ -438,7 +434,8 @@ public class QuorumCnxManager {
             }
             }
 
 
             senderWorkerMap.put(sid, sw);
             senderWorkerMap.put(sid, sw);
-            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
+
+            queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
 
 
             sw.start();
             sw.start();
             rw.start();
             rw.start();
@@ -573,7 +570,7 @@ public class QuorumCnxManager {
 
 
             senderWorkerMap.put(sid, sw);
             senderWorkerMap.put(sid, sw);
 
 
-            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
+            queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));
 
 
             sw.start();
             sw.start();
             rw.start();
             rw.start();
@@ -598,10 +595,9 @@ public class QuorumCnxManager {
             /*
             /*
              * Start a new connection if doesn't have one already.
              * Start a new connection if doesn't have one already.
              */
              */
-            ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new ArrayBlockingQueue<>(SEND_CAPACITY));
+            BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
             addToSendQueue(bq, b);
             addToSendQueue(bq, b);
             connectOne(sid);
             connectOne(sid);
-
         }
         }
     }
     }
 
 
@@ -724,9 +720,10 @@ public class QuorumCnxManager {
      * Check if all queues are empty, indicating that all messages have been delivered.
      * Check if all queues are empty, indicating that all messages have been delivered.
      */
      */
     boolean haveDelivered() {
     boolean haveDelivered() {
-        for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
-            LOG.debug("Queue size: {}", queue.size());
-            if (queue.size() == 0) {
+        for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
+            final int queueSize = queue.size();
+            LOG.debug("Queue size: {}", queueSize);
+            if (queueSize == 0) {
                 return true;
                 return true;
             }
             }
         }
         }
@@ -1085,7 +1082,7 @@ public class QuorumCnxManager {
                  * message than that stored in lastMessage. To avoid sending
                  * message than that stored in lastMessage. To avoid sending
                  * stale message, we should send the message in the send queue.
                  * stale message, we should send the message in the send queue.
                  */
                  */
-                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+                BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                 if (bq == null || isSendQueueEmpty(bq)) {
                 if (bq == null || isSendQueueEmpty(bq)) {
                     ByteBuffer b = lastMessageSent.get(sid);
                     ByteBuffer b = lastMessageSent.get(sid);
                     if (b != null) {
                     if (b != null) {
@@ -1103,7 +1100,7 @@ public class QuorumCnxManager {
 
 
                     ByteBuffer b = null;
                     ByteBuffer b = null;
                     try {
                     try {
-                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
+                        BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                         if (bq != null) {
                         if (bq != null) {
                             b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                             b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                         } else {
                         } else {
@@ -1216,37 +1213,19 @@ public class QuorumCnxManager {
     }
     }
 
 
     /**
     /**
-     * Inserts an element in the specified queue. If the Queue is full, this
-     * method removes an element from the head of the Queue and then inserts
-     * the element at the tail. It can happen that an element is removed
-     * by another thread in {@link SendWorker#run() }
-     * method before this method attempts to remove an element from the queue.
-     * This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
-     * exception, which is safe to ignore.
-     *
-     * Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
-     * not need to be synchronized since there is only one thread that inserts
-     * an element in the queue and another thread that reads from the queue.
+     * Inserts an element in the provided {@link BlockingQueue}. This method
+     * assumes that if the Queue is full, an element from the head of the Queue is
+     * removed and the new item is inserted at the tail of the queue. This is done
+     * to prevent a thread from blocking while inserting an element in the queue.
      *
      *
-     * @param queue
-     *          Reference to the Queue
-     * @param buffer
-     *          Reference to the buffer to be inserted in the queue
+     * @param queue Reference to the Queue
+     * @param buffer Reference to the buffer to be inserted in the queue
      */
      */
-    private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
-        if (queue.remainingCapacity() == 0) {
-            try {
-                queue.remove();
-            } catch (NoSuchElementException ne) {
-                // element could be removed by poll()
-                LOG.debug("Trying to remove from an empty Queue. Ignoring exception.", ne);
-            }
-        }
-        try {
-            queue.add(buffer);
-        } catch (IllegalStateException ie) {
-            // This should never happen
-            LOG.error("Unable to insert an element in the queue ", ie);
+    private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
+        final ByteBuffer buffer) {
+        final boolean success = queue.offer(buffer);
+        if (!success) {
+          throw new RuntimeException("Could not insert into receive queue");
         }
         }
     }
     }
 
 
@@ -1257,7 +1236,7 @@ public class QuorumCnxManager {
      * @return
      * @return
      *      true if the specified queue is empty
      *      true if the specified queue is empty
      */
      */
-    private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
+    private boolean isSendQueueEmpty(final BlockingQueue<ByteBuffer> queue) {
         return queue.isEmpty();
         return queue.isEmpty();
     }
     }
 
 
@@ -1266,49 +1245,25 @@ public class QuorumCnxManager {
      * waiting up to the specified wait time if necessary for an element to
      * waiting up to the specified wait time if necessary for an element to
      * become available.
      * become available.
      *
      *
-     * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+     * {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
      */
      */
-    private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
-        return queue.poll(timeout, unit);
+    private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
+          final long timeout, final TimeUnit unit) throws InterruptedException {
+       return queue.poll(timeout, unit);
     }
     }
 
 
     /**
     /**
      * Inserts an element in the {@link #recvQueue}. If the Queue is full, this
      * Inserts an element in the {@link #recvQueue}. If the Queue is full, this
-     * methods removes an element from the head of the Queue and then inserts
-     * the element at the tail of the queue.
-     *
-     * This method is synchronized to achieve fairness between two threads that
-     * are trying to insert an element in the queue. Each thread checks if the
-     * queue is full, then removes the element at the head of the queue, and
-     * then inserts an element at the tail. This three-step process is done to
-     * prevent a thread from blocking while inserting an element in the queue.
-     * If we do not synchronize the call to this method, then a thread can grab
-     * a slot in the queue created by the second thread. This can cause the call
-     * to insert by the second thread to fail.
-     * Note that synchronizing this method does not block another thread
-     * from polling the queue since that synchronization is provided by the
-     * queue itself.
+     * methods removes an element from the head of the Queue and then inserts the
+     * element at the tail of the queue.
      *
      *
-     * @param msg
-     *          Reference to the message to be inserted in the queue
+     * @param msg Reference to the message to be inserted in the queue
      */
      */
-    public void addToRecvQueue(Message msg) {
-        synchronized (recvQLock) {
-            if (recvQueue.remainingCapacity() == 0) {
-                try {
-                    recvQueue.remove();
-                } catch (NoSuchElementException ne) {
-                    // element could be removed by poll()
-                    LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception.", ne);
-                }
-            }
-            try {
-                recvQueue.add(msg);
-            } catch (IllegalStateException ie) {
-                // This should never happen
-                LOG.error("Unable to insert element in the recvQueue ", ie);
-            }
-        }
+    public void addToRecvQueue(final Message msg) {
+      final boolean success = this.recvQueue.offer(msg);
+      if (!success) {
+          throw new RuntimeException("Could not insert into receive queue");
+      }
     }
     }
 
 
     /**
     /**
@@ -1316,10 +1271,11 @@ public class QuorumCnxManager {
      * waiting up to the specified wait time if necessary for an element to
      * waiting up to the specified wait time if necessary for an element to
      * become available.
      * become available.
      *
      *
-     * {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
+     * {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
      */
      */
-    public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
-        return recvQueue.poll(timeout, unit);
+    public Message pollRecvQueue(final long timeout, final TimeUnit unit)
+       throws InterruptedException {
+       return this.recvQueue.poll(timeout, unit);
     }
     }
 
 
     public boolean connectedToPeer(long peerSid) {
     public boolean connectedToPeer(long peerSid) {

+ 277 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/util/CircularBlockingQueue.java

@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.util;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bounded blocking queue backed by an array. This queue orders elements FIFO
+ * (first-in-first-out). The head of the queue is that element that has been on
+ * the queue the longest time. The tail of the queue is that element that has
+ * been on the queue the shortest time. New elements are inserted at the tail of
+ * the queue, and the queue retrieval operations obtain elements at the head of
+ * the queue. If the queue is full, the head of the queue (the oldest element)
+ * will be removed to make room for the newest element.
+ */
+public class CircularBlockingQueue<E> implements BlockingQueue<E> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CircularBlockingQueue.class);
+
+  /** Main lock guarding all access */
+  private final ReentrantLock lock;
+
+  /** Condition for waiting takes */
+  private final Condition notEmpty;
+
+  /** The array-backed queue */
+  private final ArrayDeque<E> queue;
+
+  private final int maxSize;
+
+  private long droppedCount;
+
+  public CircularBlockingQueue(int queueSize) {
+    this.queue = new ArrayDeque<>(queueSize);
+    this.maxSize = queueSize;
+
+    this.lock =  new ReentrantLock();
+    this.notEmpty = this.lock.newCondition();
+    this.droppedCount = 0L;
+  }
+
+  /**
+   * This method differs from {@link BlockingQueue#offer(Object)} in that it
+   * will remove the oldest queued element (the element at the front of the
+   * queue) in order to make room for any new elements if the queue is full.
+   *
+   * @param e the element to add
+   * @return true since it will make room for any new elements if required
+   */
+  @Override
+  public boolean offer(E e) {
+      Objects.requireNonNull(e);
+      final ReentrantLock lock = this.lock;
+      lock.lock();
+      try {
+          if (this.queue.size() == this.maxSize) {
+              final E discard = this.queue.remove();
+              this.droppedCount++;
+              LOG.debug("Queue is full. Discarding oldest element [count={}]: {}",
+                  this.droppedCount, discard);
+          }
+          this.queue.add(e);
+          this.notEmpty.signal();
+      } finally {
+          lock.unlock();
+      }
+      return true;
+  }
+
+  @Override
+  public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+      long nanos = unit.toNanos(timeout);
+      final ReentrantLock lock = this.lock;
+      lock.lockInterruptibly();
+      try {
+          while (this.queue.isEmpty()) {
+              if (nanos <= 0) {
+                  return null;
+              }
+              nanos = this.notEmpty.awaitNanos(nanos);
+          }
+          return this.queue.poll();
+      } finally {
+          lock.unlock();
+      }
+  }
+
+  @Override
+  public E take() throws InterruptedException {
+      final ReentrantLock lock = this.lock;
+      lock.lockInterruptibly();
+      try {
+          while (this.queue.isEmpty()) {
+            this.notEmpty.await();
+          }
+          return this.queue.poll();
+      } finally {
+          lock.unlock();
+      }
+  }
+
+  @Override
+  public boolean isEmpty() {
+    final ReentrantLock lock = this.lock;
+    lock.lock();
+    try {
+      return this.queue.isEmpty();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public int size() {
+    final ReentrantLock lock = this.lock;
+    lock.lock();
+    try {
+      return this.queue.size();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns the number of elements that were dropped from the queue because the
+   * queue was full when a new element was offered.
+   *
+   * @return The number of elements dropped (lost) from the queue
+   */
+  public long getDroppedCount() {
+    return this.droppedCount;
+  }
+
+  /**
+   * For testing purposes only.
+   *
+   * @return True if a thread is blocked waiting for a new element to be offered
+   *         to the queue
+   */
+  boolean isConsumerThreadBlocked() {
+    final ReentrantLock lock = this.lock;
+    lock.lock();
+    try {
+      return lock.getWaitQueueLength(this.notEmpty) > 0;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public int drainTo(Collection<? super E> c) {
+    throw new UnsupportedOperationException();
+  }
+
+
+  @Override
+  public E poll() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public E element() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public E peek() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public E remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends E> arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object[] toArray() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T[] toArray(T[] arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean add(E e) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean contains(Object o) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int drainTo(Collection<? super E> c, int maxElements) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean offer(E e, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void put(E e) throws InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int remainingCapacity() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean remove(Object o) {
+    throw new UnsupportedOperationException();
+  }
+
+}

+ 76 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/util/TestCircularBlockingQueue.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCircularBlockingQueue {
+
+  @Test
+  public void testCircularBlockingQueue() throws InterruptedException {
+    final CircularBlockingQueue<Integer> testQueue =
+        new CircularBlockingQueue<>(2);
+
+    testQueue.offer(1);
+    testQueue.offer(2);
+    testQueue.offer(3);
+
+    Assert.assertEquals(2, testQueue.size());
+
+    Assert.assertEquals(2, testQueue.take().intValue());
+    Assert.assertEquals(3, testQueue.take().intValue());
+
+    Assert.assertEquals(1L, testQueue.getDroppedCount());
+    Assert.assertEquals(0, testQueue.size());
+    Assert.assertEquals(true, testQueue.isEmpty());
+  }
+
+  @Test(timeout = 10000L)
+  public void testCircularBlockingQueueTakeBlock()
+      throws InterruptedException, ExecutionException {
+
+    final CircularBlockingQueue<Integer> testQueue = new CircularBlockingQueue<>(2);
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      Future<Integer> testTake = executor.submit(() -> {
+        return testQueue.take();
+      });
+
+      // Allow the other thread to get into position; waiting for item to be
+      // inserted
+      while (!testQueue.isConsumerThreadBlocked()) {
+        Thread.sleep(50L);
+      }
+
+      testQueue.offer(10);
+
+      Integer result = testTake.get();
+      Assert.assertEquals(10, result.intValue());
+    } finally {
+      executor.shutdown();
+    }
+  }
+
+}