|
@@ -55,6 +55,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
// stop functionality.
|
|
// stop functionality.
|
|
private volatile boolean drainEventsOnStop = false;
|
|
private volatile boolean drainEventsOnStop = false;
|
|
|
|
|
|
|
|
+ // Indicates all the remaining dispatcher's events on stop have been drained
|
|
|
|
+ // and processed.
|
|
|
|
+ private volatile boolean drained = true;
|
|
private Object waitForDrained = new Object();
|
|
private Object waitForDrained = new Object();
|
|
|
|
|
|
// For drainEventsOnStop enabled only, block newly coming events into the
|
|
// For drainEventsOnStop enabled only, block newly coming events into the
|
|
@@ -81,12 +84,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
|
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
|
|
|
+ drained = eventQueue.isEmpty();
|
|
// blockNewEvents is only set when dispatcher is draining to stop,
|
|
// blockNewEvents is only set when dispatcher is draining to stop,
|
|
// adding this check is to avoid the overhead of acquiring the lock
|
|
// adding this check is to avoid the overhead of acquiring the lock
|
|
// and calling notify every time in the normal run of the loop.
|
|
// and calling notify every time in the normal run of the loop.
|
|
if (blockNewEvents) {
|
|
if (blockNewEvents) {
|
|
synchronized (waitForDrained) {
|
|
synchronized (waitForDrained) {
|
|
- if (eventQueue.isEmpty()) {
|
|
|
|
|
|
+ if (drained) {
|
|
waitForDrained.notify();
|
|
waitForDrained.notify();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -135,7 +139,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
blockNewEvents = true;
|
|
blockNewEvents = true;
|
|
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
|
|
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
|
|
synchronized (waitForDrained) {
|
|
synchronized (waitForDrained) {
|
|
- while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) {
|
|
|
|
|
|
+ while (!drained && eventHandlingThread.isAlive()) {
|
|
waitForDrained.wait(1000);
|
|
waitForDrained.wait(1000);
|
|
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
|
|
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
|
|
eventHandlingThread.getState());
|
|
eventHandlingThread.getState());
|
|
@@ -219,21 +223,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
return handlerInstance;
|
|
return handlerInstance;
|
|
}
|
|
}
|
|
|
|
|
|
- @VisibleForTesting
|
|
|
|
- protected boolean hasPendingEvents() {
|
|
|
|
- return !eventQueue.isEmpty();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @VisibleForTesting
|
|
|
|
- protected boolean isEventThreadWaiting() {
|
|
|
|
- return eventHandlingThread.getState() == Thread.State.WAITING;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
class GenericEventHandler implements EventHandler<Event> {
|
|
class GenericEventHandler implements EventHandler<Event> {
|
|
public void handle(Event event) {
|
|
public void handle(Event event) {
|
|
if (blockNewEvents) {
|
|
if (blockNewEvents) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
+ drained = false;
|
|
|
|
|
|
/* all this method does is enqueue all the events onto the queue */
|
|
/* all this method does is enqueue all the events onto the queue */
|
|
int qSize = eventQueue.size();
|
|
int qSize = eventQueue.size();
|
|
@@ -290,4 +285,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
}
|
|
}
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ protected boolean isDrained() {
|
|
|
|
+ return this.drained;
|
|
|
|
+ }
|
|
}
|
|
}
|