Browse Source

AMBARI-6092. Ambari should return alerts for Flume (ncole)

Nate Cole 11 years ago
parent
commit
44ced4d5d8
17 changed files with 600 additions and 39 deletions
  1. 4 0
      ambari-agent/src/main/python/ambari_agent/ActionQueue.py
  2. 41 0
      ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
  3. 82 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/AgentAlert.java
  4. 16 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
  5. 18 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
  6. 29 0
      ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosAlert.java
  7. 18 3
      ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProvider.java
  8. 157 0
      ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
  9. 41 0
      ambari-server/src/main/java/org/apache/ambari/server/state/AlertState.java
  10. 13 0
      ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
  11. 34 2
      ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
  12. 16 0
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
  13. 1 7
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/NAGIOS/package/templates/hadoop-servicegroups.cfg.j2
  14. 0 15
      ambari-server/src/main/resources/stacks/HDP/2.0.6/services/NAGIOS/package/templates/hadoop-services.cfg.j2
  15. 39 4
      ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
  16. 86 4
      ambari-server/src/test/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProviderTest.java
  17. 5 4
      ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py

+ 4 - 0
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -247,6 +247,10 @@ class ActionQueue(threading.Thread):
       result = livestatus.build(forsed_component_status= component_status)
 
       if component_extra is not None and len(component_extra) != 0:
+        if component_extra.has_key('alerts'):
+          result['alerts'] = component_extra['alerts']
+          del component_extra['alerts']
+
         result['extra'] = component_extra
 
       logger.debug("Got live status for component " + component + \

+ 41 - 0
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -146,6 +146,15 @@ class TestActionQueue(TestCase):
     'hostLevelParams':{'custom_command': 'RESTART'}
   }
 
+  status_command_for_alerts = {
+    "serviceName" : 'FLUME',
+    "commandType" : "STATUS_COMMAND",
+    "clusterName" : "",
+    "componentName" : "FLUME_HANDLER",
+    'configurations':{},
+    'hostLevelParams': {}
+  }
+
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
@@ -444,3 +453,35 @@ class TestActionQueue(TestCase):
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertEqual(report['componentStatus'][0], expected)
     self.assertTrue(requestComponentStatus_mock.called)
+
+  @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(StackVersionsFileHandler, "read_stack_version")
+  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+  @patch.object(ActionQueue, "execute_command")
+  @patch.object(LiveStatus, "build")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
+                                  build_mock, execute_command_mock,
+                                  requestComponentStatus_mock, read_stack_version_mock,
+                                  status_update_callback):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+
+
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {
+      'exitcode': 0,
+      'stdout': 'out',
+      'stderr': 'err',
+      'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
+    }
+    build_mock.return_value = {'somestatusresult': 'aresult'}
+
+    actionQueue.execute_status_command(self.status_command_for_alerts)
+
+    report = actionQueue.result()
+
+    self.assertTrue(requestComponentStatus_mock.called)
+    self.assertEqual(len(report['componentStatus']), 1)
+    self.assertTrue(report['componentStatus'][0].has_key('alerts'))

+ 82 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/AgentAlert.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.apache.ambari.server.state.AlertState;
+
+/**
+ * Represents an alert that originates from an Agent.
+ */
+public class AgentAlert {
+
+  private String name = null;
+  private AlertState state = null;
+  private String instance = null;
+  private String label = null;
+  private String text = null;
+
+  /**
+   * Public constructor for use by JSON parsers.
+   */
+  public AgentAlert() {
+  }
+  
+  /**
+   * Constructor used for testing
+   */
+  AgentAlert(String alertName, AlertState alertState) {
+    name = alertName;
+    state = alertState;
+  }
+ 
+  /**
+   * @return the label
+   */
+  public String getLabel() {
+    return label;
+  }
+  
+  /**
+   * @return the text
+   */
+  public String getText() {
+    return text;
+  }
+ 
+  /**
+   * @return the state
+   */
+  public AlertState getState() {
+    return state;
+  }
+
+  /**
+   * @return the name
+   */
+  public String getName() {
+    return name;
+  }
+  
+  /**
+   * @return instance specific information
+   */
+  public String getInstance() {
+    return instance;
+  }
+  
+}

+ 16 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.agent;
 
+import java.util.List;
 import java.util.Map;
 
 
@@ -32,6 +33,7 @@ public class ComponentStatus {
   private String stackVersion;
   private Map<String, Map<String, String>> configurationTags;
   private Map<String, Object> extra;
+  private List<AgentAlert> alerts;
 
   public String getComponentName() {
     return this.componentName;
@@ -117,6 +119,20 @@ public class ComponentStatus {
   public Map<String, Object> getExtra() {
     return extra;
   }
+  
+  /**
+   * Sets alert information from the agent
+   */
+  public void setAlerts(List<AgentAlert> alertInfo) {
+    alerts = alertInfo;
+  }
+  
+  /**
+   * Gets alert information from the agent
+   */
+  public List<AgentAlert> getAlerts() {
+    return alerts;
+  }
 
   @Override
   public String toString() {

+ 18 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.agent;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,6 +37,7 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.metadata.ActionMetadata;
 import org.apache.ambari.server.state.AgentVersion;
+import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
@@ -456,6 +458,22 @@ public class HeartBeatHandler {
                 }
               }
               
+              if (null != status.getAlerts()) {
+                List<Alert> clusterAlerts = new ArrayList<Alert>();
+                for (AgentAlert aa : status.getAlerts()) {
+                  Alert alert = new Alert(aa.getName(), aa.getInstance(),
+                      scHost.getServiceName(), scHost.getServiceComponentName(),
+                      scHost.getHostName(), aa.getState());
+                  alert.setLabel(aa.getLabel());
+                  alert.setText(aa.getText());
+                  
+                  clusterAlerts.add(alert);
+                }
+                
+               if (0 != clusterAlerts.size())
+                 cl.addAlerts(clusterAlerts);
+              }
+              
 
             } else {
               // TODO: What should be done otherwise?

+ 29 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosAlert.java

@@ -17,6 +17,8 @@
  */
 package org.apache.ambari.server.controller.nagios;
 
+import org.apache.ambari.server.state.Alert;
+
 /**
  * Represents a Nagios alert as represented by the JSON returning from the HTTP
  * call.
@@ -39,6 +41,33 @@ public class NagiosAlert {
   private String long_plugin_output = null;
 
   
+  /**
+   * Use a cluster alert as the basis for this alert.  This bridge can be
+   * removed when Nagios is not longer the Source For Alerts.
+   */
+  public NagiosAlert(Alert alert) {
+    service_type = alert.getService();
+    host_name = alert.getHost();
+    switch (alert.getState()) {
+    case CRITICAL:
+      current_state = "2";
+      break;
+    case OK:
+      current_state = "0";
+      break;
+    case WARNING:
+      current_state = "1";
+      break;
+    default:
+      current_state = "3";
+      break;
+    }
+    
+    service_description = alert.getLabel();
+    plugin_output = alert.getText();
+    
+  }
+  
   public NagiosAlert() {
   }
 

+ 18 - 3
ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProvider.java

@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -49,6 +50,7 @@ import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
@@ -360,6 +362,8 @@ public class NagiosPropertyProvider extends BaseProvider implements PropertyProv
     
     String nagiosHost = null;
     
+    List<NagiosAlert> results = new ArrayList<NagiosAlert>();
+    
     try {
       Cluster cluster = clusters.getCluster(clusterName);
       Service service = cluster.getService("NAGIOS");
@@ -368,12 +372,21 @@ public class NagiosPropertyProvider extends BaseProvider implements PropertyProv
       if (!hosts.isEmpty())
         nagiosHost = hosts.keySet().iterator().next();
       
+      // !!! use the cluster to retrieve alerts that are not from Nagios, but
+      // from agents themselves.
+      Collection<Alert> currentAlerts = cluster.getAlerts();
+      if (null != currentAlerts) {
+        for (Alert alert : currentAlerts) {
+          results.add(new NagiosAlert(alert));
+        }
+      }
+      
     } catch (AmbariException e) {
       LOG.debug("Cannot find a nagios service.  Skipping alerts.");
     }
     
     if (null == nagiosHost) {
-      return new ArrayList<NagiosAlert>();
+      return results;
     } else {
       String template = NAGIOS_TEMPLATE;
 
@@ -388,8 +401,10 @@ public class NagiosPropertyProvider extends BaseProvider implements PropertyProv
         in = urlStreamProvider.readFrom(url);
         
         NagiosAlerts alerts = new Gson().fromJson(IOUtils.toString(in, "UTF-8"), NagiosAlerts.class);
+
+        results.addAll(alerts.alerts);
         
-        Collections.sort(alerts.alerts, new Comparator<NagiosAlert>() {
+        Collections.sort(results, new Comparator<NagiosAlert>() {
           @Override
           public int compare(NagiosAlert o1, NagiosAlert o2) {
             if (o2.getStatus() != o1.getStatus())
@@ -400,7 +415,7 @@ public class NagiosPropertyProvider extends BaseProvider implements PropertyProv
           }
         });
         
-        return alerts.alerts;
+        return results;
       } catch (Exception e) {
         throw new SystemException("Error reading HTTP response for cluster " + clusterName +
             ", nagios=" + url + " (" + e.getMessage() + ")", e);

+ 157 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java

@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state;
+
+/**
+ * An alert represents a problem or notice for a cluster.
+ */
+public class Alert {
+  private String name = null;
+  private String instance = null;
+  private String service = null;
+  private String component = null;
+  private String host = null;
+  private AlertState state = AlertState.UNKNOWN;
+  private String label = null;
+  private String text = null;
+  
+ 
+  /**
+   * Constructor.
+   * @param alertName the name of the alert
+   * @param alertInstance instance specific information in the event that two alert
+   *    types can be run, ie Flume.
+   * @param serviceName the service
+   * @param componentName the component
+   * @param hostName the host
+   * @param alertState the state of the alertable event
+   */
+  public Alert(String alertName, String alertInstance, String serviceName,
+      String componentName,  String hostName, AlertState alertState) {
+    name = alertName;
+    instance = alertInstance;
+    service = serviceName;
+    component = componentName;
+    host = hostName;
+    state = alertState;
+  }
+ 
+  /**
+   * @return the name
+   */
+  public String getName() {
+    return name;
+  }
+ 
+  /**
+   * @return the service
+   */
+  public String getService() {
+    return service;
+  }
+  
+  /**
+   * @return the component
+   */
+  public String getComponent() {
+    return component;
+  }
+ 
+  /**
+   * @return the host
+   */
+  public String getHost() {
+    return host;
+  }
+ 
+  /**
+   * @return the state
+   */
+  public AlertState getState() {
+    return state;
+  }
+
+  /**
+   * @return a short descriptive label for the alert
+   */
+  public String getLabel() {
+    return label;
+  }
+
+  /**
+   * @param alertLabel a short descriptive label for the alert
+   */
+  public void setLabel(String alertLabel) {
+    label = alertLabel;
+  }
+ 
+  /**
+   * @return detail text about the alert
+   */
+  public String getText() {
+    return text;
+  }
+  
+  /**
+   * @param alertText detail text about the alert
+   */
+  public void setText(String alertText) {
+    text = alertText;
+  }
+  
+  @Override
+  public int hashCode() {
+    int result = 0;
+    
+    result += (null != name) ? name.hashCode() : 0;
+    result += 31 * result + (null != instance ? instance.hashCode() : 0);
+    result += 31 * result + (null != service ? service.hashCode() : 0);
+    result += 31 * result + (null != component ? component.hashCode() : 0);
+    result += 31 * result + (null != host ? host.hashCode() : 0);
+    
+    return result;
+  }
+
+  /**
+   * An alert's uniqueness comes from a combination of name, instance, service,
+   * component and host.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (null == o || !Alert.class.isInstance(o))
+      return false;
+    
+    return hashCode() == o.hashCode();
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append('{');
+    sb.append("state=").append(state).append(", ");
+    sb.append("name=").append(name).append(", ");
+    sb.append("service=").append(service).append(", ");
+    sb.append("component=").append(component).append(", ");
+    sb.append("host=").append(host).append(", ");
+    sb.append("instance=").append(instance);
+    sb.append('}');
+    return sb.toString();
+  }
+  
+
+}

+ 41 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/AlertState.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state;
+
+/**
+ * Represents the state of an alert.
+ */
+public enum AlertState {
+  /**
+   * Alert does not need to be distributed.  Normal Operation.
+   */
+  OK,
+  /**
+   * Alert indicates there may be an issue.  The component may be operating
+   * normally but may be in danger of becoming <code>CRITICAL</code>.
+   */
+  WARNING,
+  /**
+   * Indicates there is a critical situation that needs to be addressed.
+   */
+  CRITICAL,
+  /**
+   * The state of the alert is not known.
+   */
+  UNKNOWN
+}

+ 13 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java

@@ -304,4 +304,17 @@ public interface Cluster {
    * @return list of failed events
    */
   List<ServiceComponentHostEvent> processServiceComponentHostEvents(ListMultimap<String, ServiceComponentHostEvent> eventMap);
+
+  
+  /**
+   * Adds alerts for the current cluster.
+   */
+  public void addAlerts(Collection<Alert> alerts);
+  
+  /**
+   * @return a collection of current known alerts.
+   */
+  public Collection<Alert> getAlerts();
+
+
 }

+ 34 - 2
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java

@@ -36,7 +36,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.persistence.RollbackException;
 
-import com.google.common.collect.ListMultimap;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceNotFoundException;
@@ -55,7 +54,9 @@ import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
 import org.apache.ambari.server.orm.entities.ClusterStateEntity;
 import org.apache.ambari.server.orm.entities.ConfigGroupEntity;
 import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
+import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ClusterHealthReport;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigFactory;
@@ -70,7 +71,6 @@ import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.State;
-import org.apache.ambari.server.state.ClusterHealthReport;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
@@ -79,6 +79,7 @@ import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ListMultimap;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -131,6 +132,8 @@ public class ClusterImpl implements Cluster {
   private final ReadWriteLock clusterGlobalLock = new ReentrantReadWriteLock();
 
   private ClusterEntity clusterEntity;
+  
+  private Set<Alert> clusterAlerts = new HashSet<Alert>(); 
 
   @Inject
   private ClusterDAO clusterDAO;
@@ -1551,4 +1554,33 @@ public class ClusterImpl implements Cluster {
 
     return chr;
   }
+  
+  @Override
+  public void addAlerts(Collection<Alert> alerts) {
+    try {
+      writeLock.lock();
+      if (LOG.isDebugEnabled()) {
+        for (Alert alert : alerts) {
+          LOG.debug("Adding alert for name={} service={}, on host={}",
+              alert.getName(), alert.getService(), alert.getHost());
+        }
+      }
+      clusterAlerts.removeAll(alerts);
+      clusterAlerts.addAll(alerts);
+
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  @Override
+  public Collection<Alert> getAlerts() {
+    try {
+      readLock.lock();
+      
+      return Collections.unmodifiableSet(clusterAlerts);
+    } finally {
+      readLock.unlock();
+    }
+  }
 }

+ 16 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py

@@ -59,6 +59,22 @@ class FlumeHandler(Script):
 
     json = {}
     json['processes'] = processes
+    json['alerts'] = []
+
+    for proc in processes:
+      alert = {}
+      alert['name'] = 'flume_agent'
+      alert['instance'] = proc['name']
+      alert['label'] = 'Flume Agent process'
+
+      if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING':
+        alert['state'] = 'CRITICAL'
+        alert['text'] = 'Flume agent {0} not running'.format(proc['name'])
+      else:
+        alert['state'] = 'OK'
+        alert['text'] = 'Flume agent {0} is running'.format(proc['name'])
+
+      json['alerts'].append(alert)
 
     self.put_structured_out(json)
 

+ 1 - 7
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/NAGIOS/package/templates/hadoop-servicegroups.cfg.j2

@@ -39,12 +39,6 @@ define servicegroup {
   alias  YARN Checks
 }
 {% endif %}
-{%if hostgroup_defs['flume-servers'] %}
-define servicegroup {
-  servicegroup_name  FLUME
-  alias  FLUME Checks
-}
-{% endif %}
 {%if hostgroup_defs['hbasemasters'] %}
 define servicegroup {
   servicegroup_name  HBASE
@@ -112,4 +106,4 @@ define servicegroup {
   servicegroup_name  FALCON
   alias  FALCON Checks
 }
-{% endif %}
+{% endif %}

+ 0 - 15
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/NAGIOS/package/templates/hadoop-services.cfg.j2

@@ -618,21 +618,6 @@ define service {
 
 {% endif %}
 
-{% if hostgroup_defs['flume-servers'] %}
-# FLUME Checks
-define service {
-        hostgroup_name          flume-servers
-        use                     hadoop-service
-        service_description     FLUME::Flume Agent process
-        servicegroups           FLUME
-        check_command           check_tcp_wrapper!{{ flume_port }}!-w 1 -c 1
-        normal_check_interval   1
-        retry_check_interval    0.5
-        max_check_attempts      3
-}
-{% endif %}
-
-
 {% if hostgroup_defs['zookeeper-servers'] %}
 # ZOOKEEPER Checks
 define service {

+ 39 - 4
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -19,6 +19,7 @@ package org.apache.ambari.server.agent;
 
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DATANODE;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCluster;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCurrentPingPort;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostStatus;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyOSRelease;
@@ -31,7 +32,6 @@ import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS_CLIENT;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.SECONDARY_NAMENODE;
-import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCurrentPingPort;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -52,20 +52,26 @@ import java.util.Set;
 
 import javax.xml.bind.JAXBException;
 
-import com.google.inject.persist.UnitOfWork;
-
 import junit.framework.Assert;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
-import org.apache.ambari.server.actionmanager.*;
+import org.apache.ambari.server.actionmanager.ActionDBAccessor;
+import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.actionmanager.Request;
+import org.apache.ambari.server.actionmanager.RequestFactory;
+import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.agent.HostStatus.Status;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
@@ -92,6 +98,7 @@ import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
+import com.google.inject.persist.UnitOfWork;
 
 public class TestHeartbeatHandler {
 
@@ -1960,6 +1967,34 @@ public class TestHeartbeatHandler {
     ServiceComponentHost sch = hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1); 
 
     Assert.assertEquals(Integer.valueOf(2), Integer.valueOf(sch.getProcesses().size()));
+    
+    hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(1);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb.setReports(new ArrayList<CommandReport>());
+    
+    componentStatus1 = new ComponentStatus();
+    componentStatus1.setClusterName(DummyCluster);
+    componentStatus1.setServiceName(HDFS);
+    componentStatus1.setMessage(DummyHostStatus);
+    componentStatus1.setStatus(State.STARTED.name());
+    componentStatus1.setComponentName(DATANODE);
+    componentStatus1.setAlerts(Collections.singletonList(
+        new AgentAlert("xyz", AlertState.CRITICAL)));
+    hb.setComponentStatus(Collections.singletonList(componentStatus1));
+    
+    handler.handleHeartBeat(hb);
+    
+    Assert.assertNotNull(hdfs.getCluster().getAlerts());
+    Assert.assertEquals(1, hdfs.getCluster().getAlerts().size());
+    Alert clusterAlert = hdfs.getCluster().getAlerts().iterator().next();
+    Assert.assertNotNull(clusterAlert);
+    Assert.assertEquals(HDFS, clusterAlert.getService());
+    Assert.assertEquals(DATANODE, clusterAlert.getComponent());
+    Assert.assertEquals(DummyHostname1, clusterAlert.getHost());
+    Assert.assertEquals(AlertState.CRITICAL, clusterAlert.getState());
   }
   
   @Test

+ 86 - 4
ambari-server/src/test/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProviderTest.java

@@ -37,6 +37,8 @@ import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
@@ -64,15 +66,17 @@ public class NagiosPropertyProviderTest {
 
   @Before
   public void setup() throws Exception {
-
     module = new GuiceModule();
     injector = Guice.createInjector(module);
     NagiosPropertyProvider.init(injector);
     
-    
     clusters = injector.getInstance(Clusters.class);
     Cluster cluster = createMock(Cluster.class);
+    
+    expect(cluster.getAlerts()).andReturn(Collections.<Alert>emptySet()).anyTimes();
+    
     expect(clusters.getCluster("c1")).andReturn(cluster).anyTimes();
+    
 
     Service nagiosService = createMock(Service.class);
     expect(cluster.getService("NAGIOS")).andReturn(nagiosService).anyTimes();
@@ -190,7 +194,7 @@ public class NagiosPropertyProviderTest {
   }
 
   @Test
-  public void testNoNagiosServerCompoonent() throws Exception {
+  public void testNoNagiosServerComponent() throws Exception {
 
     Cluster cluster = clusters.getCluster("c1");
     reset(cluster);
@@ -443,6 +447,7 @@ public class NagiosPropertyProviderTest {
     
     Clusters clusters = inj.getInstance(Clusters.class);
     Cluster cluster = createMock(Cluster.class);
+    expect(cluster.getAlerts()).andReturn(Collections.<Alert>emptySet()).anyTimes();
     expect(clusters.getCluster("c1")).andReturn(cluster);
 
     Service nagiosService = createMock(Service.class);
@@ -572,5 +577,82 @@ public class NagiosPropertyProviderTest {
      binder.bind(Configuration.class).toInstance(new Configuration(properties));
     }
   }
-  
+
+  @Test
+  public void testNagiosServiceAlertsWithAgentAlerts() throws Exception {
+    Injector inj = Guice.createInjector(new GuiceModule());
+    
+    Clusters clusters = inj.getInstance(Clusters.class);
+    Cluster cluster = createMock(Cluster.class);
+    
+    Alert alert = new Alert("ganglia_madeup", null, "GANGLIA", "GANGLIA_MYSTERY",
+        "h1", AlertState.CRITICAL);
+    
+    expect(cluster.getAlerts()).andReturn(Collections.singleton(alert)).anyTimes();
+    expect(clusters.getCluster("c1")).andReturn(cluster);
+
+    Service nagiosService = createMock(Service.class);
+    expect(cluster.getService("NAGIOS")).andReturn(nagiosService);
+    
+    ServiceComponent nagiosServiceComponent = createMock(ServiceComponent.class);
+    expect(nagiosService.getServiceComponent("NAGIOS_SERVER")).andReturn(nagiosServiceComponent);
+    
+    ServiceComponentHost nagiosScHost = createMock(ServiceComponentHost.class);
+    Map<String, ServiceComponentHost> map1 = new HashMap<String, ServiceComponentHost>();
+    map1.put(HOST, nagiosScHost);
+    expect(nagiosServiceComponent.getServiceComponentHosts()).andReturn(map1);
+    
+    replay(clusters, cluster, nagiosService, nagiosServiceComponent);
+
+    
+    TestStreamProvider streamProvider = new TestStreamProvider("nagios_alerts.txt");
+
+    NagiosPropertyProvider npp = new NagiosPropertyProvider(Resource.Type.Service,
+        streamProvider,
+        "ServiceInfo/cluster_name",
+        "ServiceInfo/service_name");
+    npp.forceReset();
+    NagiosPropertyProvider.init(inj);
+    
+    Resource resource = new ResourceImpl(Resource.Type.Service);
+    resource.setProperty("ServiceInfo/cluster_name", "c1");
+    resource.setProperty("ServiceInfo/service_name", "GANGLIA");
+    
+    // request with an empty set should get all supported properties
+    Request request = PropertyHelper.getReadRequest(Collections.<String>emptySet(), new HashMap<String, TemporalInfo>());
+
+    Set<Resource> set = npp.populateResources(Collections.singleton(resource), request, null);
+    Assert.assertEquals(1, set.size());
+    
+    Resource res = set.iterator().next();
+    
+    Map<String, Map<String, Object>> values = res.getPropertiesMap();
+    
+    Assert.assertTrue(values.containsKey("alerts"));
+    Assert.assertTrue(values.containsKey("alerts/summary"));
+    Assert.assertTrue(values.get("alerts").containsKey("detail"));
+    Assert.assertTrue(List.class.isInstance(values.get("alerts").get("detail")));
+    
+    List<?> list = (List<?>) values.get("alerts").get("detail");
+    // removed an additional one
+    Assert.assertEquals(Integer.valueOf(5), Integer.valueOf(list.size()));
+    for (Object o : list) {
+      Assert.assertTrue(Map.class.isInstance(o));
+      Map<?, ?> map = (Map<?, ?>) o;
+      Assert.assertTrue(map.containsKey("service_name"));
+      String serviceName = map.get("service_name").toString();
+      Assert.assertEquals(serviceName, "GANGLIA");
+    }
+    
+    Map<String, Object> summary = values.get("alerts/summary");
+    Assert.assertTrue(summary.containsKey("OK"));
+    Assert.assertTrue(summary.containsKey("WARNING"));
+    Assert.assertTrue(summary.containsKey("CRITICAL"));
+    Assert.assertTrue(summary.containsKey("PASSIVE"));
+    
+    Assert.assertEquals(Integer.valueOf(3), summary.get("OK"));
+    Assert.assertEquals(Integer.valueOf(0), summary.get("WARNING"));
+    Assert.assertEquals(Integer.valueOf(1), summary.get("CRITICAL"));
+    Assert.assertEquals(Integer.valueOf(1), summary.get("PASSIVE"));
+  }  
 }

+ 5 - 4
ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py

@@ -97,7 +97,7 @@ class TestFlumeHandler(RMFTestCase):
     
     # test that the method was called with empty processes
     self.assertTrue(structured_out_mock.called)
-    structured_out_mock.assert_called_with({'processes': []})
+    structured_out_mock.assert_called_with({'processes': [], 'alerts': []})
 
     self.assertNoMoreResources()
 
@@ -118,9 +118,10 @@ class TestFlumeHandler(RMFTestCase):
     
     self.assertTrue(structured_out_mock.called)
 
-    structured_out_mock.assert_called_with({'processes':
-      [{'status': 'NOT_RUNNING', 'channels_count': 0, 'sinks_count': 0,
-        'name': 'a1', 'sources_count': 0}]})
+    # call_args[0] is a tuple, whose first element is the actual call argument
+    struct_out = structured_out_mock.call_args[0][0]
+    self.assertTrue(struct_out.has_key('processes'))
+    self.assertTrue(struct_out.has_key('alerts'))
 
     self.assertNoMoreResources()