|
@@ -18,7 +18,11 @@
|
|
|
package org.apache.hadoop.hdds.server.events;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
+
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(EventQueue.class);
|
|
|
|
|
|
+ private static final String EXECUTOR_NAME_SEPARATOR = "For";
|
|
|
+
|
|
|
private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
|
|
|
new HashMap<>();
|
|
|
|
|
@@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable {
|
|
|
|
|
|
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
|
|
|
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
|
|
|
-
|
|
|
- this.addHandler(event, new SingleThreadExecutor<>(
|
|
|
- event.getName()), handler);
|
|
|
+ this.addHandler(event, handler, generateHandlerName(handler));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Add new handler to the event queue.
|
|
|
+ * <p>
|
|
|
+ * By default a separated single thread executor will be dedicated to
|
|
|
+ * deliver the events to the registered event handler.
|
|
|
+ *
|
|
|
+ * @param event Triggering event.
|
|
|
+ * @param handler Handler of event (will be called from a separated
|
|
|
+ * thread)
|
|
|
+ * @param handlerName The name of handler (should be unique together with
|
|
|
+ * the event name)
|
|
|
+ * @param <PAYLOAD> The type of the event payload.
|
|
|
+ * @param <EVENT_TYPE> The type of the event identifier.
|
|
|
+ */
|
|
|
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
|
|
|
- EVENT_TYPE event,
|
|
|
- EventExecutor<PAYLOAD> executor,
|
|
|
- EventHandler<PAYLOAD> handler) {
|
|
|
+ EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
|
|
|
+ validateEvent(event);
|
|
|
+ Preconditions.checkNotNull(handler, "Handler name should not be null.");
|
|
|
+ String executorName =
|
|
|
+ StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
|
|
|
+ + handlerName;
|
|
|
+ this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
|
|
|
+ }
|
|
|
|
|
|
- executors.putIfAbsent(event, new HashMap<>());
|
|
|
- executors.get(event).putIfAbsent(executor, new ArrayList<>());
|
|
|
+ private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
|
|
|
+ Preconditions
|
|
|
+ .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
|
|
|
+ "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
|
|
|
+ + " string.");
|
|
|
|
|
|
- executors.get(event)
|
|
|
- .get(executor)
|
|
|
- .add(handler);
|
|
|
+ }
|
|
|
+
|
|
|
+ private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
|
|
|
+ if (!"".equals(handler.getClass().getSimpleName())) {
|
|
|
+ return handler.getClass().getSimpleName();
|
|
|
+ } else {
|
|
|
+ return handler.getClass().getName();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Creates one executor with multiple event handlers.
|
|
|
+ * Add event handler with custom executor.
|
|
|
+ *
|
|
|
+ * @param event Triggering event.
|
|
|
+ * @param executor The executor imlementation to deliver events from a
|
|
|
+ * separated threads. Please keep in your mind that
|
|
|
+ * registering metrics is the responsibility of the
|
|
|
+ * caller.
|
|
|
+ * @param handler Handler of event (will be called from a separated
|
|
|
+ * thread)
|
|
|
+ * @param <PAYLOAD> The type of the event payload.
|
|
|
+ * @param <EVENT_TYPE> The type of the event identifier.
|
|
|
*/
|
|
|
- public void addHandlerGroup(String name, HandlerForEvent<?>...
|
|
|
- eventsAndHandlers) {
|
|
|
- SingleThreadExecutor sharedExecutor =
|
|
|
- new SingleThreadExecutor(name);
|
|
|
- for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
|
|
|
- addHandler(handlerForEvent.event, sharedExecutor,
|
|
|
- handlerForEvent.handler);
|
|
|
- }
|
|
|
+ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
|
|
|
+ EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
|
|
|
+ EventHandler<PAYLOAD> handler) {
|
|
|
+ validateEvent(event);
|
|
|
+ executors.putIfAbsent(event, new HashMap<>());
|
|
|
+ executors.get(event).putIfAbsent(executor, new ArrayList<>());
|
|
|
|
|
|
+ executors.get(event).get(executor).add(handler);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Route an event with payload to the right listener(s).
|
|
|
*
|
|
@@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Event identifier together with the handler.
|
|
|
- *
|
|
|
- * @param <PAYLOAD>
|
|
|
- */
|
|
|
- public static class HandlerForEvent<PAYLOAD> {
|
|
|
-
|
|
|
- private final Event<PAYLOAD> event;
|
|
|
-
|
|
|
- private final EventHandler<PAYLOAD> handler;
|
|
|
-
|
|
|
- public HandlerForEvent(
|
|
|
- Event<PAYLOAD> event,
|
|
|
- EventHandler<PAYLOAD> handler) {
|
|
|
- this.event = event;
|
|
|
- this.handler = handler;
|
|
|
- }
|
|
|
-
|
|
|
- public Event<PAYLOAD> getEvent() {
|
|
|
- return event;
|
|
|
- }
|
|
|
-
|
|
|
- public EventHandler<PAYLOAD> getHandler() {
|
|
|
- return handler;
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
}
|