|
@@ -56,6 +56,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
|
// Indicates all the remaining dispatcher's events on stop have been drained
|
|
|
// and processed.
|
|
|
private volatile boolean drained = true;
|
|
|
+ private Object waitForDrained = new Object();
|
|
|
|
|
|
// For drainEventsOnStop enabled only, block newly coming events into the
|
|
|
// queue while stopping.
|
|
@@ -82,6 +83,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
|
public void run() {
|
|
|
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
|
|
drained = eventQueue.isEmpty();
|
|
|
+ // blockNewEvents is only set when dispatcher is draining to stop,
|
|
|
+ // adding this check is to avoid the overhead of acquiring the lock
|
|
|
+ // and calling notify every time in the normal run of the loop.
|
|
|
+ if (blockNewEvents) {
|
|
|
+ synchronized (waitForDrained) {
|
|
|
+ if (drained) {
|
|
|
+ waitForDrained.notify();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
Event event;
|
|
|
try {
|
|
|
event = eventQueue.take();
|
|
@@ -125,8 +136,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
|
if (drainEventsOnStop) {
|
|
|
blockNewEvents = true;
|
|
|
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
|
|
|
- while(!drained) {
|
|
|
- Thread.yield();
|
|
|
+ synchronized (waitForDrained) {
|
|
|
+ while (!drained && eventHandlingThread.isAlive()) {
|
|
|
+ waitForDrained.wait(1000);
|
|
|
+ LOG.info("Waiting for AsyncDispatcher to drain.");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
stopped = true;
|