|
@@ -27,8 +27,7 @@ import java.util.AbstractQueue;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
-import java.util.concurrent.locks.ReentrantLock;
|
|
|
-import java.util.concurrent.locks.Condition;
|
|
|
+import java.util.concurrent.Semaphore;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
@@ -55,16 +54,15 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
/* The queues */
|
|
|
private final ArrayList<BlockingQueue<E>> queues;
|
|
|
|
|
|
- /* Read locks */
|
|
|
- private final ReentrantLock takeLock = new ReentrantLock();
|
|
|
- private final Condition notEmpty = takeLock.newCondition();
|
|
|
+ /* Track available permits for scheduled objects. All methods that will
|
|
|
+ * mutate a subqueue must acquire or release a permit on the semaphore.
|
|
|
+ * A semaphore is much faster than an exclusive lock because producers do
|
|
|
+ * not contend with consumers and consumers do not block other consumers
|
|
|
+ * while polling.
|
|
|
+ */
|
|
|
+ private final Semaphore semaphore = new Semaphore(0);
|
|
|
private void signalNotEmpty() {
|
|
|
- takeLock.lock();
|
|
|
- try {
|
|
|
- notEmpty.signal();
|
|
|
- } finally {
|
|
|
- takeLock.unlock();
|
|
|
- }
|
|
|
+ semaphore.release();
|
|
|
}
|
|
|
|
|
|
/* Multiplexer picks which queue to draw from */
|
|
@@ -112,28 +110,25 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the first non-empty queue with equal to <i>startIdx</i>, or
|
|
|
- * or scans from highest to lowest priority queue.
|
|
|
+ * Returns an element first non-empty queue equal to the priority returned
|
|
|
+ * by the multiplexer or scans from highest to lowest priority queue.
|
|
|
+ *
|
|
|
+ * Caller must always acquire a semaphore permit before invoking.
|
|
|
*
|
|
|
- * @param startIdx the queue number to start searching at
|
|
|
* @return the first non-empty queue with less priority, or null if
|
|
|
* everything was empty
|
|
|
*/
|
|
|
- private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
|
|
|
- BlockingQueue<E> queue = this.queues.get(startIdx);
|
|
|
- if (queue.size() != 0) {
|
|
|
- return queue;
|
|
|
- }
|
|
|
- final int numQueues = this.queues.size();
|
|
|
- for(int i=0; i < numQueues; i++) {
|
|
|
- queue = this.queues.get(i);
|
|
|
- if (queue.size() != 0) {
|
|
|
- return queue;
|
|
|
+ private E removeNextElement() {
|
|
|
+ int priority = multiplexer.getAndAdvanceCurrentIndex();
|
|
|
+ E e = queues.get(priority).poll();
|
|
|
+ if (e == null) {
|
|
|
+ for (int idx = 0; e == null && idx < queues.size(); idx++) {
|
|
|
+ e = queues.get(idx).poll();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // All queues were empty
|
|
|
- return null;
|
|
|
+ // guaranteed to find an element if caller acquired permit.
|
|
|
+ assert e != null : "consumer didn't acquire semaphore!";
|
|
|
+ return e;
|
|
|
}
|
|
|
|
|
|
/* AbstractQueue and BlockingQueue methods */
|
|
@@ -184,9 +179,9 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
int priorityLevel = e.getPriorityLevel();
|
|
|
BlockingQueue<E> q = this.queues.get(priorityLevel);
|
|
|
boolean ret = q.offer(e, timeout, unit);
|
|
|
-
|
|
|
- signalNotEmpty();
|
|
|
-
|
|
|
+ if (ret) {
|
|
|
+ signalNotEmpty();
|
|
|
+ }
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
@@ -195,72 +190,21 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
int priorityLevel = e.getPriorityLevel();
|
|
|
BlockingQueue<E> q = this.queues.get(priorityLevel);
|
|
|
boolean ret = q.offer(e);
|
|
|
-
|
|
|
- signalNotEmpty();
|
|
|
-
|
|
|
+ if (ret) {
|
|
|
+ signalNotEmpty();
|
|
|
+ }
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public E take() throws InterruptedException {
|
|
|
- int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
|
|
|
-
|
|
|
- takeLock.lockInterruptibly();
|
|
|
- try {
|
|
|
- // Wait while queue is empty
|
|
|
- for (;;) {
|
|
|
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
|
|
|
- if (q != null) {
|
|
|
- // Got queue, so return if we can poll out an object
|
|
|
- E e = q.poll();
|
|
|
- if (e != null) {
|
|
|
- return e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- notEmpty.await();
|
|
|
- }
|
|
|
- } finally {
|
|
|
- takeLock.unlock();
|
|
|
- }
|
|
|
+ semaphore.acquire();
|
|
|
+ return removeNextElement();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public E poll(long timeout, TimeUnit unit)
|
|
|
- throws InterruptedException {
|
|
|
-
|
|
|
- int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
|
|
|
-
|
|
|
- long nanos = unit.toNanos(timeout);
|
|
|
- takeLock.lockInterruptibly();
|
|
|
- try {
|
|
|
- for (;;) {
|
|
|
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
|
|
|
- if (q != null) {
|
|
|
- E e = q.poll();
|
|
|
- if (e != null) {
|
|
|
- // Escape condition: there might be something available
|
|
|
- return e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (nanos <= 0) {
|
|
|
- // Wait has elapsed
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- // Now wait on the condition for a bit. If we get
|
|
|
- // spuriously awoken we'll re-loop
|
|
|
- nanos = notEmpty.awaitNanos(nanos);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- notEmpty.signal(); // propagate to a non-interrupted thread
|
|
|
- throw ie;
|
|
|
- }
|
|
|
- }
|
|
|
- } finally {
|
|
|
- takeLock.unlock();
|
|
|
- }
|
|
|
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
|
|
|
+ return semaphore.tryAcquire(timeout, unit) ? removeNextElement() : null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -269,15 +213,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
*/
|
|
|
@Override
|
|
|
public E poll() {
|
|
|
- int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
|
|
|
-
|
|
|
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
|
|
|
- if (q == null) {
|
|
|
- return null; // everything is empty
|
|
|
- }
|
|
|
-
|
|
|
- // Delegate to the sub-queue's poll, which could still return null
|
|
|
- return q.poll();
|
|
|
+ return semaphore.tryAcquire() ? removeNextElement() : null;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -285,12 +221,11 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
*/
|
|
|
@Override
|
|
|
public E peek() {
|
|
|
- BlockingQueue<E> q = this.getFirstNonEmptyQueue(0);
|
|
|
- if (q == null) {
|
|
|
- return null;
|
|
|
- } else {
|
|
|
- return q.peek();
|
|
|
+ E e = null;
|
|
|
+ for (int i=0; e == null && i < queues.size(); i++) {
|
|
|
+ e = queues.get(i).peek();
|
|
|
}
|
|
|
+ return e;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -301,11 +236,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
*/
|
|
|
@Override
|
|
|
public int size() {
|
|
|
- int size = 0;
|
|
|
- for (BlockingQueue<E> q : this.queues) {
|
|
|
- size += q.size();
|
|
|
- }
|
|
|
- return size;
|
|
|
+ return semaphore.availablePermits();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -324,20 +255,24 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
|
|
|
*/
|
|
|
@Override
|
|
|
public int drainTo(Collection<? super E> c, int maxElements) {
|
|
|
- int sum = 0;
|
|
|
- for (BlockingQueue<E> q : this.queues) {
|
|
|
- sum += q.drainTo(c, maxElements);
|
|
|
+ // initially take all permits to stop consumers from modifying queues
|
|
|
+ // while draining. will restore any excess when done draining.
|
|
|
+ final int permits = semaphore.drainPermits();
|
|
|
+ final int numElements = Math.min(maxElements, permits);
|
|
|
+ int numRemaining = numElements;
|
|
|
+ for (int i=0; numRemaining > 0 && i < queues.size(); i++) {
|
|
|
+ numRemaining -= queues.get(i).drainTo(c, numRemaining);
|
|
|
}
|
|
|
- return sum;
|
|
|
+ int drained = numElements - numRemaining;
|
|
|
+ if (permits > drained) { // restore unused permits.
|
|
|
+ semaphore.release(permits - drained);
|
|
|
+ }
|
|
|
+ return drained;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public int drainTo(Collection<? super E> c) {
|
|
|
- int sum = 0;
|
|
|
- for (BlockingQueue<E> q : this.queues) {
|
|
|
- sum += q.drainTo(c);
|
|
|
- }
|
|
|
- return sum;
|
|
|
+ return drainTo(c, Integer.MAX_VALUE);
|
|
|
}
|
|
|
|
|
|
/**
|