|
|
@@ -17,9 +17,11 @@
|
|
|
*/
|
|
|
package org.apache.ambari.server.agent;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
@@ -29,8 +31,12 @@ import org.apache.ambari.server.actionmanager.ActionManager;
|
|
|
import org.apache.ambari.server.agent.stomp.dto.HostStatusReport;
|
|
|
import org.apache.ambari.server.api.services.AmbariMetaInfo;
|
|
|
import org.apache.ambari.server.configuration.Configuration;
|
|
|
+import org.apache.ambari.server.events.AlertReceivedEvent;
|
|
|
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
|
|
|
import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
|
|
|
import org.apache.ambari.server.state.AgentVersion;
|
|
|
+import org.apache.ambari.server.state.Alert;
|
|
|
+import org.apache.ambari.server.state.AlertState;
|
|
|
import org.apache.ambari.server.state.Cluster;
|
|
|
import org.apache.ambari.server.state.Clusters;
|
|
|
import org.apache.ambari.server.state.ComponentInfo;
|
|
|
@@ -40,6 +46,7 @@ import org.apache.ambari.server.state.HostState;
|
|
|
import org.apache.ambari.server.state.ServiceComponent;
|
|
|
import org.apache.ambari.server.state.ServiceComponentHost;
|
|
|
import org.apache.ambari.server.state.StackId;
|
|
|
+import org.apache.ambari.server.state.alert.AlertDefinition;
|
|
|
import org.apache.ambari.server.state.alert.AlertDefinitionHash;
|
|
|
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
|
|
|
import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent;
|
|
|
@@ -92,6 +99,9 @@ public class HeartBeatHandler {
|
|
|
@Inject
|
|
|
private AgentSessionManager agentSessionManager;
|
|
|
|
|
|
+ @Inject
|
|
|
+ private AlertEventPublisher alertEventPublisher;
|
|
|
+
|
|
|
private Map<String, Long> hostResponseIds = new ConcurrentHashMap<>();
|
|
|
|
|
|
private Map<String, HeartBeatResponse> hostResponses = new ConcurrentHashMap<>();
|
|
|
@@ -215,6 +225,8 @@ public class HeartBeatHandler {
|
|
|
annotateResponse(hostname, response);
|
|
|
}
|
|
|
|
|
|
+ updateAlertLastStateTimestamp(hostname);
|
|
|
+
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
@@ -396,6 +408,40 @@ public class HeartBeatHandler {
|
|
|
return response;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Heartbeat receiving indicates agent-server connection is in order. This mean that all alerts statuses are actual
|
|
|
+ * and last state timestamp should be updated.
|
|
|
+ * @param hostName hostName for heartbeat
|
|
|
+ * @throws AmbariException
|
|
|
+ */
|
|
|
+ private void updateAlertLastStateTimestamp(String hostName) throws AmbariException {
|
|
|
+ Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostName);
|
|
|
+ if (null == hostClusters || hostClusters.size() == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<Alert> alerts = new ArrayList<>();
|
|
|
+ // for every cluster this host is a member of, build the command
|
|
|
+ for (Cluster cluster : hostClusters) {
|
|
|
+ String clusterName = cluster.getClusterName();
|
|
|
+ alertDefinitionHash.invalidate(clusterName, hostName);
|
|
|
+
|
|
|
+ List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions(
|
|
|
+ clusterName, hostName);
|
|
|
+ for (AlertDefinition alertDefinition : definitions) {
|
|
|
+ Alert toUpdate = new Alert();
|
|
|
+ toUpdate.setHostName(hostName);
|
|
|
+ toUpdate.setClusterId(alertDefinition.getClusterId());
|
|
|
+ toUpdate.setTimestamp(System.currentTimeMillis());
|
|
|
+ toUpdate.setName(alertDefinition.getName());
|
|
|
+ toUpdate.setState(AlertState.SKIPPED);
|
|
|
+ alerts.add(toUpdate);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ AlertReceivedEvent alertReceivedEvents = new AlertReceivedEvent(alerts);
|
|
|
+ alertEventPublisher.publish(alertReceivedEvents);
|
|
|
+ }
|
|
|
+
|
|
|
public void stop() {
|
|
|
heartbeatMonitor.shutdown();
|
|
|
heartbeatProcessor.stopAsync();
|