|
@@ -18,11 +18,7 @@
|
|
|
package org.apache.hadoop.hdds.server.events;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
-
|
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
-
|
|
|
-import com.google.common.base.Preconditions;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -46,8 +42,6 @@ public class EventQueue implements EventPublisher, AutoCloseable {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(EventQueue.class);
|
|
|
|
|
|
- private static final String EXECUTOR_NAME_SEPARATOR = "For";
|
|
|
-
|
|
|
private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
|
|
|
new HashMap<>();
|
|
|
|
|
@@ -57,73 +51,37 @@ public class EventQueue implements EventPublisher, AutoCloseable {
|
|
|
|
|
|
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
|
|
|
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
|
|
|
- this.addHandler(event, handler, generateHandlerName(handler));
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Add new handler to the event queue.
|
|
|
- * <p>
|
|
|
- * By default a separated single thread executor will be dedicated to
|
|
|
- * deliver the events to the registered event handler.
|
|
|
- *
|
|
|
- * @param event Triggering event.
|
|
|
- * @param handler Handler of event (will be called from a separated
|
|
|
- * thread)
|
|
|
- * @param handlerName The name of handler (should be unique together with
|
|
|
- * the event name)
|
|
|
- * @param <PAYLOAD> The type of the event payload.
|
|
|
- * @param <EVENT_TYPE> The type of the event identifier.
|
|
|
- */
|
|
|
- public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
|
|
|
- EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
|
|
|
- validateEvent(event);
|
|
|
- Preconditions.checkNotNull(handler, "Handler name should not be null.");
|
|
|
- String executorName =
|
|
|
- StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
|
|
|
- + handlerName;
|
|
|
- this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
|
|
|
- }
|
|
|
-
|
|
|
- private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
|
|
|
- Preconditions
|
|
|
- .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
|
|
|
- "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
|
|
|
- + " string.");
|
|
|
|
|
|
+ this.addHandler(event, new SingleThreadExecutor<>(
|
|
|
+ event.getName()), handler);
|
|
|
}
|
|
|
|
|
|
- private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
|
|
|
- if (!"".equals(handler.getClass().getSimpleName())) {
|
|
|
- return handler.getClass().getSimpleName();
|
|
|
- } else {
|
|
|
- return handler.getClass().getName();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Add event handler with custom executor.
|
|
|
- *
|
|
|
- * @param event Triggering event.
|
|
|
- * @param executor The executor imlementation to deliver events from a
|
|
|
- * separated threads. Please keep in your mind that
|
|
|
- * registering metrics is the responsibility of the
|
|
|
- * caller.
|
|
|
- * @param handler Handler of event (will be called from a separated
|
|
|
- * thread)
|
|
|
- * @param <PAYLOAD> The type of the event payload.
|
|
|
- * @param <EVENT_TYPE> The type of the event identifier.
|
|
|
- */
|
|
|
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
|
|
|
- EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
|
|
|
+ EVENT_TYPE event,
|
|
|
+ EventExecutor<PAYLOAD> executor,
|
|
|
EventHandler<PAYLOAD> handler) {
|
|
|
- validateEvent(event);
|
|
|
+
|
|
|
executors.putIfAbsent(event, new HashMap<>());
|
|
|
executors.get(event).putIfAbsent(executor, new ArrayList<>());
|
|
|
|
|
|
- executors.get(event).get(executor).add(handler);
|
|
|
+ executors.get(event)
|
|
|
+ .get(executor)
|
|
|
+ .add(handler);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Creates one executor with multiple event handlers.
|
|
|
+ */
|
|
|
+ public void addHandlerGroup(String name, HandlerForEvent<?>...
|
|
|
+ eventsAndHandlers) {
|
|
|
+ SingleThreadExecutor sharedExecutor =
|
|
|
+ new SingleThreadExecutor(name);
|
|
|
+ for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
|
|
|
+ addHandler(handlerForEvent.event, sharedExecutor,
|
|
|
+ handlerForEvent.handler);
|
|
|
+ }
|
|
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Route an event with payload to the right listener(s).
|
|
@@ -225,5 +183,31 @@ public class EventQueue implements EventPublisher, AutoCloseable {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Event identifier together with the handler.
|
|
|
+ *
|
|
|
+ * @param <PAYLOAD>
|
|
|
+ */
|
|
|
+ public static class HandlerForEvent<PAYLOAD> {
|
|
|
+
|
|
|
+ private final Event<PAYLOAD> event;
|
|
|
+
|
|
|
+ private final EventHandler<PAYLOAD> handler;
|
|
|
+
|
|
|
+ public HandlerForEvent(
|
|
|
+ Event<PAYLOAD> event,
|
|
|
+ EventHandler<PAYLOAD> handler) {
|
|
|
+ this.event = event;
|
|
|
+ this.handler = handler;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Event<PAYLOAD> getEvent() {
|
|
|
+ return event;
|
|
|
+ }
|
|
|
+
|
|
|
+ public EventHandler<PAYLOAD> getHandler() {
|
|
|
+ return handler;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
}
|