|
|
@@ -18,9 +18,9 @@
|
|
|
|
|
|
package org.apache.ambari.server.events.publishers;
|
|
|
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
import org.apache.ambari.server.controller.internal.CalculatedStatus;
|
|
|
import org.apache.ambari.server.events.RequestUpdateEvent;
|
|
|
@@ -35,11 +35,7 @@ import com.google.inject.Inject;
|
|
|
import com.google.inject.Singleton;
|
|
|
|
|
|
@Singleton
|
|
|
-public class RequestUpdateEventPublisher {
|
|
|
-
|
|
|
- private final Long TIMEOUT = 1000L;
|
|
|
- private ConcurrentHashMap<Long, Long> previousTime = new ConcurrentHashMap<>();
|
|
|
- private ConcurrentHashMap<Long, RequestUpdateEvent> buffer = new ConcurrentHashMap<>();
|
|
|
+public class RequestUpdateEventPublisher extends BufferedUpdateEventPublisher<RequestUpdateEvent> {
|
|
|
|
|
|
@Inject
|
|
|
private HostRoleCommandDAO hostRoleCommandDAO;
|
|
|
@@ -53,27 +49,25 @@ public class RequestUpdateEventPublisher {
|
|
|
@Inject
|
|
|
private ClusterDAO clusterDAO;
|
|
|
|
|
|
- public void publish(RequestUpdateEvent event, EventBus m_eventBus) {
|
|
|
- Long eventTime = System.currentTimeMillis();
|
|
|
- Long requestId = event.getRequestId();
|
|
|
- if (!previousTime.containsKey(requestId)) {
|
|
|
- previousTime.put(requestId, 0L);
|
|
|
+ @Override
|
|
|
+ public void mergeBufferAndPost(List<RequestUpdateEvent> events, EventBus m_eventBus) {
|
|
|
+ Map<Long, RequestUpdateEvent> filteredRequests = new HashMap<>();
|
|
|
+ for (RequestUpdateEvent event : events) {
|
|
|
+ Long requestId = event.getRequestId();
|
|
|
+ if (filteredRequests.containsKey(requestId)) {
|
|
|
+ RequestUpdateEvent filteredRequest = filteredRequests.get(requestId);
|
|
|
+ filteredRequest.setEndTime(event.getEndTime());
|
|
|
+ filteredRequest.setRequestStatus(event.getRequestStatus());
|
|
|
+ filteredRequest.setRequestContext(event.getRequestContext());
|
|
|
+ filteredRequest.getHostRoleCommands().removeAll(event.getHostRoleCommands());
|
|
|
+ filteredRequest.getHostRoleCommands().addAll(event.getHostRoleCommands());
|
|
|
+ } else {
|
|
|
+ filteredRequests.put(requestId, event);
|
|
|
+ }
|
|
|
}
|
|
|
- if (eventTime - previousTime.get(requestId) <= TIMEOUT && !buffer.containsKey(requestId)) {
|
|
|
- buffer.put(event.getRequestId(), event);
|
|
|
- Executors.newScheduledThreadPool(1).schedule(new RequestEventRunnable(requestId, m_eventBus),
|
|
|
- TIMEOUT, TimeUnit.MILLISECONDS);
|
|
|
- } else if (buffer.containsKey(requestId)) {
|
|
|
- //merge available buffer content with arrived
|
|
|
- buffer.get(requestId).setEndTime(event.getEndTime());
|
|
|
- buffer.get(requestId).setRequestStatus(event.getRequestStatus());
|
|
|
- buffer.get(requestId).setRequestContext(event.getRequestContext());
|
|
|
- buffer.get(requestId).getHostRoleCommands().removeAll(event.getHostRoleCommands());
|
|
|
- buffer.get(requestId).getHostRoleCommands().addAll(event.getHostRoleCommands());
|
|
|
- } else {
|
|
|
- previousTime.put(requestId, eventTime);
|
|
|
- //TODO add logging and metrics posting
|
|
|
- m_eventBus.post(fillRequest(event));
|
|
|
+ for (RequestUpdateEvent requestUpdateEvent : filteredRequests.values()) {
|
|
|
+ RequestUpdateEvent filled = fillRequest(requestUpdateEvent);
|
|
|
+ m_eventBus.post(filled);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -94,24 +88,4 @@ public class RequestUpdateEventPublisher {
|
|
|
}
|
|
|
return event;
|
|
|
}
|
|
|
-
|
|
|
- private class RequestEventRunnable implements Runnable {
|
|
|
-
|
|
|
- private final long requestId;
|
|
|
- private final EventBus eventBus;
|
|
|
-
|
|
|
- public RequestEventRunnable(long requestId, EventBus eventBus) {
|
|
|
- this.requestId = requestId;
|
|
|
- this.eventBus = eventBus;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- RequestUpdateEvent resultEvent = buffer.get(requestId);
|
|
|
- //TODO add logging and metrics posting
|
|
|
- eventBus.post(fillRequest(resultEvent));
|
|
|
- buffer.remove(requestId);
|
|
|
- previousTime.remove(requestId);
|
|
|
- }
|
|
|
- }
|
|
|
}
|