浏览代码

AMBARI-8430 - Current Alerts Should Be Cleaned Up With Ambari Cluster/Service/Component/Host Changes (jonathanhurley)

Jonathan Hurley 10 年之前
父节点
当前提交
024a301b16
共有 45 个文件被更改,包括 1755 次插入255 次删除
  1. 0 4
      ambari-agent/src/main/python/ambari_agent/Heartbeat.py
  2. 0 31
      ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py
  3. 0 4
      ambari-agent/src/main/python/ambari_agent/HostInfo_win.py
  4. 41 19
      ambari-agent/src/main/python/ambari_agent/alerts/collector.py
  5. 47 19
      ambari-agent/src/test/python/ambari_agent/TestAlerts.py
  6. 2 5
      ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
  7. 0 15
      ambari-agent/src/test/python/ambari_agent/TestHostInfo.py
  8. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
  9. 54 44
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
  10. 54 0
      ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDisabledEvent.java
  11. 58 0
      ambari-server/src/main/java/org/apache/ambari/server/events/AlertHashInvalidationEvent.java
  12. 35 0
      ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
  13. 49 0
      ambari-server/src/main/java/org/apache/ambari/server/events/HostEvent.java
  14. 44 0
      ambari-server/src/main/java/org/apache/ambari/server/events/HostRemovedEvent.java
  15. 64 0
      ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentInstalledEvent.java
  16. 79 0
      ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentUninstalledEvent.java
  17. 52 0
      ambari-server/src/main/java/org/apache/ambari/server/events/ServiceRemovedEvent.java
  18. 6 6
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
  19. 62 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertDefinitionDisabledListener.java
  20. 120 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHashInvalidationListener.java
  21. 75 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHostListener.java
  22. 12 3
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertLifecycleListener.java
  23. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertMaintenanceModeListener.java
  24. 140 3
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
  25. 67 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceComponentHostListener.java
  26. 36 3
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java
  27. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertStateChangedListener.java
  28. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
  29. 81 0
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
  30. 5 1
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
  31. 6 0
      ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
  32. 9 0
      ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
  33. 51 56
      ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
  34. 29 12
      ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
  35. 34 7
      ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
  36. 10 3
      ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
  37. 0 2
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
  38. 44 2
      ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
  39. 1 1
      ambari-server/src/test/java/org/apache/ambari/server/events/MockEventListener.java
  40. 69 2
      ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
  41. 1 1
      ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java
  42. 18 3
      ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
  43. 290 0
      ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertReceivedListenerTest.java
  44. 2 2
      ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
  45. 3 3
      ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java

+ 0 - 4
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -49,8 +49,6 @@ class Heartbeat:
 
     nodeStatus = { "status" : "HEALTHY",
                    "cause" : "NONE" }
-    nodeStatus["alerts"] = []
-
 
     heartbeat = { 'responseId'        : int(id),
                   'timestamp'         : timestamp,
@@ -94,8 +92,6 @@ class Heartbeat:
         logger.debug("agentEnv: %s", str(nodeInfo))
         logger.debug("mounts: %s", str(mounts))
 
-    nodeStatus["alerts"] = hostInfo.createAlerts(nodeStatus["alerts"])
-    
     if self.collector is not None:
       heartbeat['alerts'] = self.collector.alerts()
     

+ 0 - 31
ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py

@@ -215,37 +215,6 @@ class HostInfo:
       pass
     return diskInfo
 
-  def createAlerts(self, alerts):
-    existingUsers = []
-    self.checkUsers(self.DEFAULT_USERS, existingUsers)
-    dirs = []
-    self.checkFolders(self.DEFAULT_DIRS, self.DEFAULT_PROJECT_NAMES, existingUsers, dirs)
-    alert = {
-      'name': 'host_alert',
-      'instance': None,
-      'service': 'AMBARI',
-      'component': 'host',
-      'host': hostname.hostname(self.config),
-      'state': 'OK',
-      'label': 'Disk space',
-      'text': 'Used disk space less than 80%'}
-    message = ""
-    mountinfoSet = []
-    for dir in dirs:
-      if dir["type"] == 'directory':
-        mountinfo = self.osdiskAvailableSpace(dir['name'])
-        if int(mountinfo["percent"].strip('%')) >= 80:
-          if not mountinfo in mountinfoSet:
-            mountinfoSet.append(mountinfo)
-          message += str(dir['name']) + ";\n"
-
-    if message != "":
-      message = "These discs have low space:\n" + str(mountinfoSet) + "\n They include following critical directories:\n" + message
-      alert['state'] = 'WARNING'
-      alert['text'] = message
-    alerts.append(alert)
-    return alerts
-
   def checkFolders(self, basePaths, projectNames, existingUsers, dirs):
     foldersToIgnore = []
     for user in existingUsers:

+ 0 - 4
ambari-agent/src/main/python/ambari_agent/HostInfo_win.py

@@ -119,10 +119,6 @@ class HostInfo:
         result['status'] = "Available"
         results.append(result)
 
-  def createAlerts(self, alerts):
-    #TODO AMBARI-7849 Implement createAlerts for Windows
-    return alerts
-
   def javaProcs(self, list):
     try:
       runner = shellRunner()

+ 41 - 19
ambari-agent/src/main/python/ambari_agent/alerts/collector.py

@@ -19,6 +19,7 @@ limitations under the License.
 """
 
 import logging
+import threading
 
 logger = logging.getLogger()
 
@@ -28,43 +29,64 @@ class AlertCollector():
   """  
   def __init__(self):
     self.__buckets = {}
+    self.__lock = threading.RLock()
 
 
   def put(self, cluster, alert):
-    if not cluster in self.__buckets:
-      self.__buckets[cluster] = {}
-      
-    self.__buckets[cluster][alert['name']] = alert
+    self.__lock.acquire()
+    try:
+      if not cluster in self.__buckets:
+        self.__buckets[cluster] = {}
+        
+      self.__buckets[cluster][alert['name']] = alert
+    finally:
+      self.__lock.release()
 
 
   def remove(self, cluster, alert_name):
     """
     Removes the alert with the specified name if it exists in the dictionary
     """
-    if not cluster in self.__buckets:
-      return
-    
-    del self.__buckets[cluster][alert_name]
+    self.__lock.acquire()
+    try:
+      if not cluster in self.__buckets:
+        return
+      
+      del self.__buckets[cluster][alert_name]
+    finally:
+      self.__lock.release()
 
 
   def remove_by_uuid(self, alert_uuid):
     """
     Removes the alert with the specified uuid if it exists in the dictionary
     """
-    for cluster,alert_map in self.__buckets.iteritems():
-      for alert_name in alert_map.keys():
-        alert = alert_map[alert_name]
-        if alert['uuid'] == alert_uuid:
-          self.remove(cluster, alert_name)
+    self.__lock.acquire()
+    try:
+      for cluster,alert_map in self.__buckets.iteritems():
+        for alert_name in alert_map.keys():
+          alert = alert_map[alert_name]
+          if alert['uuid'] == alert_uuid:
+            self.remove(cluster, alert_name)
+    finally:
+      self.__lock.release()
 
 
   def alerts(self):
-    alerts = []
-    for clustermap in self.__buckets.values()[:]:
-      alerts.extend(clustermap.values())
-      
-    return alerts
-      
+    '''
+    Gets all of the alerts collected since the last time this method was
+    called. This method will clear the collected alerts.
+    '''
+    self.__lock.acquire()
+    try:
+      alerts = []
+      for clustermap in self.__buckets.values()[:]:
+        alerts.extend(clustermap.values())
+
+      self.__buckets.clear()
+      return alerts
+    finally:
+      self.__lock.release()
     
 
     

+ 47 - 19
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -126,8 +126,12 @@ class TestAlerts(TestCase):
     self.assertEquals(6, pa.interval())
 
     pa.collect()
-    self.assertEquals('OK', collector.alerts()[0]['state'])
-    self.assertTrue('response time on port 2181' in collector.alerts()[0]['text'])
+    
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    
+    self.assertEquals('OK', alerts[0]['state'])
+    self.assertTrue('response time on port 2181' in alerts[0]['text'])
 
 
   def test_port_alert_no_sub(self):
@@ -190,8 +194,11 @@ class TestAlerts(TestCase):
 
     sa.collect()
 
-    self.assertEquals('WARNING', collector.alerts()[0]['state'])
-    self.assertEquals('bar is rendered-bar, baz is rendered-baz', collector.alerts()[0]['text'])
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+
+    self.assertEquals('WARNING', alerts[0]['state'])
+    self.assertEquals('bar is rendered-bar, baz is rendered-baz', alerts[0]['text'])
 
 
   @patch.object(MetricAlert, "_load_jmx")
@@ -239,9 +246,12 @@ class TestAlerts(TestCase):
     ma = MetricAlert(json, json['source'])
     ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
     ma.collect()
-
-    self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
-    self.assertEquals('crit_arr: 1 3 223', collector.alerts()[0]['text'])
+    
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    
+    self.assertEquals('CRITICAL', alerts[0]['state'])
+    self.assertEquals('crit_arr: 1 3 223', alerts[0]['text'])
 
     del json['source']['jmx']['value']
     collector = AlertCollector()
@@ -249,8 +259,11 @@ class TestAlerts(TestCase):
     ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
     ma.collect()
 
-    self.assertEquals('OK', collector.alerts()[0]['state'])
-    self.assertEquals('ok_arr: 1 3 None', collector.alerts()[0]['text'])
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+
+    self.assertEquals('OK', alerts[0]['state'])
+    self.assertEquals('ok_arr: 1 3 None', alerts[0]['text'])
 
 
   @patch.object(MetricAlert, "_load_jmx")
@@ -385,8 +398,11 @@ class TestAlerts(TestCase):
     alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
     alert.collect()
 
-    self.assertEquals('OK', collector.alerts()[0]['state'])
-    self.assertEquals('ok: 200', collector.alerts()[0]['text'])
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+
+    self.assertEquals('OK', alerts[0]['state'])
+    self.assertEquals('ok: 200', alerts[0]['text'])
 
     # run the alert and check HTTP 500
     wa_make_web_request_mock.return_value = WebResponse(500,1.234)
@@ -395,8 +411,11 @@ class TestAlerts(TestCase):
     alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
     alert.collect()
     
-    self.assertEquals('WARNING', collector.alerts()[0]['state'])
-    self.assertEquals('warning: 500', collector.alerts()[0]['text'])
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    
+    self.assertEquals('WARNING', alerts[0]['state'])
+    self.assertEquals('warning: 500', alerts[0]['text'])
 
     # run the alert and check critical
     wa_make_web_request_mock.return_value = WebResponse(0,0)
@@ -406,9 +425,12 @@ class TestAlerts(TestCase):
     alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
     alert.collect()
     
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))    
+    
     # http assertion indicating that we properly determined non-SSL
-    self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
-    self.assertEquals('critical: http://1.2.3.4:80', collector.alerts()[0]['text'])
+    self.assertEquals('CRITICAL', alerts[0]['state'])
+    self.assertEquals('critical: http://1.2.3.4:80', alerts[0]['text'])
      
     collector = AlertCollector()
     alert = WebAlert(json, json['source'])
@@ -419,9 +441,12 @@ class TestAlerts(TestCase):
 
     alert.collect()
     
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))    
+    
     # SSL assertion
-    self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
-    self.assertEquals('critical: https://1.2.3.4:8443', collector.alerts()[0]['text'])
+    self.assertEquals('CRITICAL', alerts[0]['state'])
+    self.assertEquals('critical: https://1.2.3.4:8443', alerts[0]['text'])
 
   def test_reschedule(self):
     test_file_path = os.path.join('ambari_agent', 'dummy_files')
@@ -468,8 +493,11 @@ class TestAlerts(TestCase):
 
     res = pa.collect()
 
-    self.assertTrue(collector.alerts()[0] is not None)
-    self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+
+    self.assertTrue(alerts[0] is not None)
+    self.assertEquals('CRITICAL', alerts[0]['state'])
 
     collector.remove_by_uuid('c1f73191-4481-4435-8dae-fd380e4c0be1')
     self.assertEquals(0,len(collector.alerts()))

+ 2 - 5
ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py

@@ -65,7 +65,7 @@ class TestHeartbeat(TestCase):
     self.assertEquals(result['componentStatus'] is not None, True, "Heartbeat should contain componentStatus")
     self.assertEquals(result['reports'] is not None, True, "Heartbeat should contain reports")
     self.assertEquals(result['timestamp'] >= 1353679373880L, True)
-    self.assertEquals(len(result['nodeStatus']), 3)
+    self.assertEquals(len(result['nodeStatus']), 2)
     self.assertEquals(result['nodeStatus']['cause'], "NONE")
     self.assertEquals(result['nodeStatus']['status'], "HEALTHY")
     # result may or may NOT have an agentEnv structure in it
@@ -103,10 +103,8 @@ class TestHeartbeat(TestCase):
     hb = heartbeat.build(id = 0, state_interval=1, componentsMapped=True)
     self.assertEqual(register_mock.call_args_list[0][0][1], False)
 
-  @patch.object(HostInfo, "createAlerts")
   @patch.object(ActionQueue, "result")
-  def test_build_long_result(self, result_mock, createAlerts_mock):
-    createAlerts_mock.return_value = []
+  def test_build_long_result(self, result_mock):
     config = AmbariConfig.AmbariConfig().getConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -171,7 +169,6 @@ class TestHeartbeat(TestCase):
     hb['timestamp'] = 'timestamp'
     expected = {'nodeStatus':
                   {'status': 'HEALTHY',
-                   'alerts': [],
                    'cause': 'NONE'},
                 'timestamp': 'timestamp', 'hostname': 'hostname',
                 'responseId': 10, 'reports': [

+ 0 - 15
ambari-agent/src/test/python/ambari_agent/TestHostInfo.py

@@ -529,21 +529,6 @@ class TestHostInfo(TestCase):
     self.assertTrue(Firewall().getFirewallObject().check_iptables())
 
 
-  @patch.object(HostInfo, "osdiskAvailableSpace")
-  def test_createAlerts(self, osdiskAvailableSpace_mock):
-    hostInfo = HostInfo()
-    osdiskAvailableSpace_mock.return_value = {
-      'size': '100',
-      'used': '50',
-      'available': '50',
-      'percent': '50%',
-      'mountpoint': '/testmount',
-      'type': 'ext4',
-      'device': 'device'}
-    result = hostInfo.createAlerts([])
-    self.assertEquals(1, len(result))
-
-
   @patch.object(socket, "getfqdn")
   @patch.object(socket, "gethostbyname")
   @patch.object(socket, "gethostname")

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -215,7 +215,7 @@ public class HeartBeatHandler {
       return createRegisterCommand();
     }
 
-    //Examine heartbeat for command reports
+    // Examine heartbeat for command reports
     processCommandReports(heartbeat, hostname, clusterFsm, now);
 
     // Examine heartbeart for component live status reports
@@ -225,6 +225,7 @@ public class HeartBeatHandler {
     // NOTE: This step must be after processing command/status reports
     processHostStatus(heartbeat, hostname);
 
+    // Example heartbeat for alerts from the host or its components
     processAlerts(heartbeat, hostname);
 
     // Send commands if node is active

+ 54 - 44
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java

@@ -44,6 +44,9 @@ import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.events.AlertDefinitionDisabledEvent;
+import org.apache.ambari.server.events.AlertHashInvalidationEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.state.Cluster;
@@ -110,6 +113,16 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
   @Inject
   private static ActionQueue actionQueue;
 
+  /**
+   * Publishes the following events:
+   * <ul>
+   * <li>{@link AlertDefinitionDisabledEvent} when an alert definition is
+   * disabled</li>
+   * </ul>
+   */
+  @Inject
+  private static AmbariEventPublisher eventPublisher;
+
   /**
    * The property ids for an alert defintion resource.
    */
@@ -187,16 +200,17 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
       }
     }
 
-    Set<String> invalidatedHosts = new HashSet<String>();
-
-    // !!! TODO multi-create in a transaction
     for (AlertDefinitionEntity entity : entities) {
       alertDefinitionDAO.create(entity);
-      invalidatedHosts.addAll(alertDefinitionHash.invalidateHosts(entity));
-    }
+      long clusterId = entity.getClusterId();
 
-    // build alert definition commands for all agent hosts affected
-    alertDefinitionHash.enqueueAgentCommands(clusterName, invalidatedHosts);
+      // invalidate the hash and publish the event
+      final Set<String> invalidatedHosts = alertDefinitionHash.invalidateHosts(entity);
+      AlertHashInvalidationEvent event = new AlertHashInvalidationEvent(
+          clusterId, invalidatedHosts);
+
+      eventPublisher.publish(event);
+    }
   }
 
   @Override
@@ -246,9 +260,6 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
       throws SystemException, UnsupportedPropertyException,
       NoSuchResourceException, NoSuchParentResourceException {
 
-    String clusterName = null;
-    Clusters clusters = getManagementController().getClusters();
-
     // check the predicate to see if there is a reques to run
     // the alert definition immediately
     if( null != predicate ){
@@ -263,8 +274,7 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
       }
     }
 
-    // if an AlertDefinition property body was specified, perform the update\
-    Set<String> invalidatedHosts = new HashSet<String>();
+    // if an AlertDefinition property body was specified, perform the update
     for (Map<String, Object> requestPropMap : request.getProperties()) {
       for (Map<String, Object> propertyMap : getPropertyMaps(requestPropMap, predicate)) {
         String stringId = (String) propertyMap.get(ALERT_DEF_ID);
@@ -275,36 +285,35 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
           continue;
         }
 
-        if (null == clusterName) {
-          try {
-            Cluster cluster = clusters.getClusterById(entity.getClusterId());
-            if (null != cluster) {
-              clusterName = cluster.getClusterName();
-            }
-          } catch (AmbariException ae) {
-            throw new IllegalArgumentException("Invalid cluster ID", ae);
-          }
-        }
+        // capture the state of the old entity
+        boolean oldEnabled = entity.getEnabled();
 
-        try{
+        try {
           populateEntity(entity, propertyMap);
           alertDefinitionDAO.merge(entity);
-          invalidatedHosts.addAll(alertDefinitionHash.invalidateHosts(entity));
-        }
-        catch( AmbariException ae ){
+
+          // invalidate and publish the definition hash
+          Set<String> invalidatedHosts = alertDefinitionHash.invalidateHosts(entity);
+          AlertHashInvalidationEvent event = new AlertHashInvalidationEvent(
+              entity.getClusterId(), invalidatedHosts);
+
+          eventPublisher.publish(event);
+        } catch (AmbariException ae) {
           LOG.error("Unable to find cluster when updating alert definition", ae);
         }
-      }
-    }
 
-    // build alert definition commands for all agent hosts affected; only
-    // update agents and broadcast update event if there are actual invalidated
-    // hosts
-    if (invalidatedHosts.size() > 0) {
-      alertDefinitionHash.enqueueAgentCommands(clusterName, invalidatedHosts);
-      notifyUpdate(Resource.Type.AlertDefinition, request, predicate);
+        // if the old state was enabled and the new state is not, trigger
+        // a disabled event
+        if (oldEnabled && !entity.getEnabled()) {
+          AlertDefinitionDisabledEvent event = new AlertDefinitionDisabledEvent(
+              entity.getClusterId(), entity.getDefinitionId());
+
+          eventPublisher.publish(event);
+        }
+      }
     }
 
+    notifyUpdate(Resource.Type.AlertDefinition, request, predicate);
     return getRequestStatus(null);
   }
 
@@ -318,17 +327,11 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
 
     Set<Long> definitionIds = new HashSet<Long>();
 
-    String clusterName = null;
     for (final Resource resource : resources) {
       Long id = (Long) resource.getPropertyValue(ALERT_DEF_ID);
       definitionIds.add(id);
-
-      if (null == clusterName) {
-        clusterName = (String) resource.getPropertyValue(ALERT_DEF_CLUSTER_NAME);
-      }
     }
 
-    final Set<String> invalidatedHosts = new HashSet<String>();
     for (Long definitionId : definitionIds) {
       LOG.info("Deleting alert definition {}", definitionId);
 
@@ -337,16 +340,23 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
       modifyResources(new Command<Void>() {
         @Override
         public Void invoke() throws AmbariException {
+          long clusterId = entity.getClusterId();
+
+          // remove the entity
           alertDefinitionDAO.remove(entity);
-          invalidatedHosts.addAll(alertDefinitionHash.invalidateHosts(entity));
+
+          // publish the hash invalidation
+          final Set<String> invalidatedHosts = alertDefinitionHash.invalidateHosts(entity);
+          AlertHashInvalidationEvent event = new AlertHashInvalidationEvent(
+              clusterId, invalidatedHosts);
+
+          eventPublisher.publish(event);
+
           return null;
         }
       });
     }
 
-    // build alert definition commands for all agent hosts affected
-    alertDefinitionHash.enqueueAgentCommands(clusterName, invalidatedHosts);
-
     notifyDelete(Resource.Type.AlertDefinition, predicate);
     return getRequestStatus(null);
   }

+ 54 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDisabledEvent.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.state.alert.AlertDefinition;
+
+/**
+ * The {@link AlertDefinitionDisabledEvent} is used to represent that an
+ * {@link AlertDefinition} has been disabled.
+ */
+public class AlertDefinitionDisabledEvent extends ClusterEvent {
+
+  /**
+   * The alert definition ID.
+   */
+  private final long m_definitionId;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   *          the ID of the cluster that the definition is in.
+   * @param definition
+   *          the alert definition being registered.
+   */
+  public AlertDefinitionDisabledEvent(long clusterId, long definitionId) {
+    super(AmbariEventType.ALERT_DEFINITION_REMOVAL, clusterId);
+    m_definitionId = definitionId;
+  }
+
+  /**
+   * Gets the definition ID.
+   *
+   * @return the definitionId
+   */
+  public long getDefinitionId() {
+    return m_definitionId;
+  }
+}

+ 58 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/AlertHashInvalidationEvent.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import java.util.Collection;
+
+/**
+ * The {@link AlertHashInvalidationEvent} is fired when one of the
+ * following conditions is met and the alerts running on an agent need to be
+ * recalculated:
+ * <ul>
+ * <li>An alert definition changes.</li>
+ * <li>A host is removed from the cluster.</li>
+ * <li>A service host component is added or removed.</li>
+ * </ul>
+ */
+public class AlertHashInvalidationEvent extends ClusterEvent {
+  /**
+   * The hosts that need to be invalidated.
+   */
+  private final Collection<String> m_hosts;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   *          the ID of the cluster that the definition is in.
+   * @param hosts
+   *          the hosts that were invalidated.
+   */
+  public AlertHashInvalidationEvent(long clusterId,
+      Collection<String> hosts) {
+    super(AmbariEventType.ALERT_DEFINITION_HASH_INVALIDATION, clusterId);
+    m_hosts = hosts;
+  }
+
+  /**
+   * @return the hosts
+   */
+  public Collection<String> getHosts() {
+    return m_hosts;
+  }
+}

+ 35 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java

@@ -32,6 +32,21 @@ public abstract class AmbariEvent {
      */
     SERVICE_INSTALL_SUCCESS,
 
+    /**
+     * A service was successfully removed.
+     */
+    SERVICE_REMOVED_SUCCESS,
+
+    /**
+     * A service component was successfully installed.
+     */
+    SERVICE_COMPONENT_INSTALL_SUCCESS,
+
+    /**
+     * A service component was successfully uninstalled.
+     */
+    SERVICE_COMPONENT_UNINSTALLED_SUCCESS,
+
     /**
      * An alert definition is registered with the system.
      */
@@ -42,6 +57,26 @@ public abstract class AmbariEvent {
      */
     ALERT_DEFINITION_REMOVAL,
 
+    /**
+     * The alert definition has was invalidated.
+     */
+    ALERT_DEFINITION_HASH_INVALIDATION,
+
+    /**
+     * The alert definition was disabled.
+     */
+    ALERT_DEFINITION_DISABLED,
+
+    /**
+     * A host was added to the cluster.
+     */
+    HOST_ADDED,
+
+    /**
+     * A host was removed from the cluster.
+     */
+    HOST_REMOVED,
+
     /**
      * A host/service/component has had a maintenance mode change.
      */

+ 49 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/HostEvent.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+/**
+ * The {@link HostEvent} represents all events in Ambari that occur directly on
+ * a host. This excludes events on the host's services and components.
+ */
+public abstract class HostEvent extends AmbariEvent {
+
+  /**
+   * The host's name.
+   */
+  protected final String m_hostName;
+
+  /**
+   * Constructor.
+   *
+   * @param eventType
+   */
+  public HostEvent(AmbariEventType eventType, String hostName) {
+    super(eventType);
+    m_hostName = hostName;
+  }
+
+  /**
+   * Gets the host's name that the event belongs to.
+   *
+   * @return the hostName
+   */
+  public String getHostName() {
+    return m_hostName;
+  }
+}

+ 44 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/HostRemovedEvent.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+/**
+ * The {@link HostRemovedEvent} class is fired when a host is removed from the
+ * cluster.
+ */
+public class HostRemovedEvent extends HostEvent {
+  /**
+   * Constructor.
+   *
+   * @param hostName
+   */
+  public HostRemovedEvent(String hostName) {
+    super(AmbariEventType.HOST_ADDED, hostName);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("HostRemovedEvent{ ");
+    buffer.append("hostName=").append(m_hostName);
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

+ 64 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentInstalledEvent.java

@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+/**
+ * The {@link ServiceComponentInstalledEvent} class is fired when a service
+ * component is successfully installed.
+ */
+public class ServiceComponentInstalledEvent extends ServiceEvent {
+  private final String m_componentName;
+  private final String m_hostName;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   * @param stackName
+   * @param stackVersion
+   * @param serviceName
+   * @param componentName
+   * @param hostName
+   */
+  public ServiceComponentInstalledEvent(long clusterId, String stackName,
+      String stackVersion, String serviceName, String componentName,
+      String hostName) {
+    super(AmbariEventType.SERVICE_COMPONENT_INSTALL_SUCCESS, clusterId,
+        stackName,
+        stackVersion, serviceName);
+
+    m_componentName = componentName;
+    m_hostName = hostName;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("ServiceComponentInstalledEvent{ ");
+    buffer.append("cluserId=").append(m_clusterId);
+    buffer.append(", stackName=").append(m_stackName);
+    buffer.append(", stackVersion=").append(m_stackVersion);
+    buffer.append(", serviceName=").append(m_serviceName);
+    buffer.append(", componentName=").append(m_componentName);
+    buffer.append(", hostName=").append(m_hostName);
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

+ 79 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentUninstalledEvent.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+/**
+ * The {@link ServiceComponentUninstalledEvent} class is fired when a service
+ * component is successfully uninstalled.
+ */
+public class ServiceComponentUninstalledEvent extends ServiceEvent {
+  private final String m_componentName;
+  private final String m_hostName;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   * @param stackName
+   * @param stackVersion
+   * @param serviceName
+   * @param componentName
+   * @param hostName
+   */
+  public ServiceComponentUninstalledEvent(long clusterId, String stackName,
+      String stackVersion, String serviceName, String componentName,
+      String hostName) {
+    super(AmbariEventType.SERVICE_COMPONENT_UNINSTALLED_SUCCESS, clusterId,
+        stackName,
+        stackVersion, serviceName);
+
+    m_componentName = componentName;
+    m_hostName = hostName;
+  }
+
+  /**
+   * @return the componentName
+   */
+  public String getComponentName() {
+    return m_componentName;
+  }
+
+  /**
+   * @return the hostName
+   */
+  public String getHostName() {
+    return m_hostName;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder(
+        "ServiceComponentUninstalledEvent{ ");
+    buffer.append("cluserId=").append(m_clusterId);
+    buffer.append(", stackName=").append(m_stackName);
+    buffer.append(", stackVersion=").append(m_stackVersion);
+    buffer.append(", serviceName=").append(m_serviceName);
+    buffer.append(", componentName=").append(m_componentName);
+    buffer.append(", hostName=").append(m_hostName);
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

+ 52 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/ServiceRemovedEvent.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+/**
+ * The {@link ServiceRemovedEvent} class is fired when a service is successfully
+ * removed.
+ */
+public class ServiceRemovedEvent extends ServiceEvent {
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   * @param stackName
+   * @param stackVersion
+   * @param serviceName
+   */
+  public ServiceRemovedEvent(long clusterId, String stackName,
+      String stackVersion, String serviceName) {
+    super(AmbariEventType.SERVICE_REMOVED_SUCCESS, clusterId, stackName,
+        stackVersion, serviceName);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("ServiceRemovedEvent{ ");
+    buffer.append("cluserId=").append(m_clusterId);
+    buffer.append(", stackName=").append(m_stackName);
+    buffer.append(", stackVersion=").append(m_stackVersion);
+    buffer.append(", serviceName=").append(m_serviceName);
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

+ 6 - 6
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java → ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.server.events.listeners;
+package org.apache.ambari.server.events.listeners.alerts;
 
 import java.text.MessageFormat;
 
@@ -100,20 +100,20 @@ public class AlertAggregateListener {
       int denominator = summary.getOkCount();
       double value = (double)(numerator) / denominator;
 
-      if (value > reporting.getCritical().getValue().doubleValue()) {
+      if (value > reporting.getCritical().getValue()) {
         alert.setState(AlertState.CRITICAL);
         alert.setText(MessageFormat.format(reporting.getCritical().getText(),
-            Integer.valueOf(denominator), Integer.valueOf(numerator)));
+            denominator, numerator));
 
-      } else if (value > reporting.getWarning().getValue().doubleValue()) {
+      } else if (value > reporting.getWarning().getValue()) {
         alert.setState(AlertState.WARNING);
         alert.setText(MessageFormat.format(reporting.getWarning().getText(),
-            Integer.valueOf(denominator), Integer.valueOf(numerator)));
+            denominator, numerator));
 
       } else {
         alert.setState(AlertState.OK);
         alert.setText(MessageFormat.format(reporting.getOk().getText(),
-            Integer.valueOf(denominator), Integer.valueOf(numerator)));
+            denominator, numerator));
       }
 
     }

+ 62 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertDefinitionDisabledListener.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.AlertDefinitionDisabledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+
+/**
+ * The {@link AlertDefinitionDisabledListener} handles event relating to the
+ * disabling of an alert definition.
+ */
+@EagerSingleton
+public class AlertDefinitionDisabledListener {
+  /**
+   * Used for deleting the alert notices when a definition is disabled.
+   */
+  @Inject
+  private AlertsDAO m_alertsDao = null;
+
+  /**
+   * Constructor.
+   *
+   * @param publisher
+   *          the publisher to register this listener with (not {@code null}).
+   */
+  @Inject
+  public AlertDefinitionDisabledListener(AmbariEventPublisher publisher) {
+    publisher.register(this);
+  }
+
+  /**
+   * Removes any {@link AlertCurrentEntity} instance associated with the
+   * specified alert definition.
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onEvent(AlertDefinitionDisabledEvent event) {
+    m_alertsDao.removeCurrentDisabledAlerts();
+  }
+}

+ 120 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHashInvalidationListener.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.agent.AlertDefinitionCommand;
+import org.apache.ambari.server.agent.HeartBeatResponse;
+import org.apache.ambari.server.events.AlertHashInvalidationEvent;
+import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.state.alert.AlertDefinitionHash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertHashInvalidationListener} is used to respond to
+ * {@link AlertHashInvalidationEvent} instances and ensure that the
+ * {@link AlertDefinitionCommand}s are enqueued for the
+ * {@link HeartBeatResponse}.
+ */
+@Singleton
+@EagerSingleton
+public class AlertHashInvalidationListener {
+  /**
+   * Logger.
+   */
+  private static Logger LOG = LoggerFactory.getLogger(AlertHashInvalidationListener.class);
+
+  /**
+   * Invalidates hosts so that they can receive updated alert definition
+   * commands.
+   */
+  @Inject
+  private Provider<AlertDefinitionHash> m_alertDefinitionHash;
+
+  /**
+   * Constructor.
+   *
+   * @param publisher
+   */
+  @Inject
+  public AlertHashInvalidationListener(AmbariEventPublisher publisher) {
+    publisher.register(this);
+  }
+
+  /**
+   * Handles {@link AlertHashInvalidationEvent} by performing the following
+   * tasks:
+   * <ul>
+   * <li>Enqueuing {@link AlertDefinitionCommand}</li>
+   * </ul>
+   *
+   * @param event
+   *          the event being handled.
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onAmbariEvent(AlertHashInvalidationEvent event) {
+    LOG.debug("An alert definition hash invalidation event was received: {}",
+        event);
+
+    Collection<String> hosts = event.getHosts();
+    long clusterId = event.getClusterId();
+
+    // no hosts, nothing to do
+    if (null == hosts || hosts.isEmpty()) {
+      return;
+    }
+
+    m_alertDefinitionHash.get().enqueueAgentCommands(clusterId, hosts);
+  }
+
+  /**
+   * Handles {@link AlertHashInvalidationEvent} by performing the following
+   * tasks:
+   * <ul>
+   * <li>Alert has invalidation</li>
+   * <li>Enqueuing {@link AlertDefinitionCommand}</li>
+   * </ul>
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onEvent(ServiceComponentUninstalledEvent event) {
+    long clusterId = event.getClusterId();
+    String hostName = event.getHostName();
+
+    if (null == hostName) {
+      return;
+    }
+
+    // invalidate hash and enqueue commands
+    m_alertDefinitionHash.get().invalidate(hostName);
+    m_alertDefinitionHash.get().enqueueAgentCommands(clusterId,
+        Collections.singletonList(hostName));
+  }
+}

+ 75 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHostListener.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.HostRemovedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertHostListener} class handles {@link HostRemovedEvent} and
+ * ensures that {@link AlertCurrentEntity} instances are properly cleaned up
+ */
+@Singleton
+@EagerSingleton
+public class AlertHostListener {
+  /**
+   * Logger.
+   */
+  private static Log LOG = LogFactory.getLog(AlertHostListener.class);
+
+  /**
+   * Used for removing current alerts when a service is removed.
+   */
+  @Inject
+  private AlertsDAO m_alertsDao;
+
+  /**
+   * Constructor.
+   *
+   * @param publisher
+   */
+  @Inject
+  public AlertHostListener(AmbariEventPublisher publisher) {
+    publisher.register(this);
+  }
+
+  /**
+   * Removes any current alerts associated with the specified host.
+   *
+   * @param event
+   *          the published event being handled (not {@code null}).
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onAmbariEvent(HostRemovedEvent event) {
+    LOG.debug(event);
+
+    // remove any current alerts for the removed host
+    m_alertsDao.removeCurrentByHost(event.getHostName());
+  }
+}

+ 12 - 3
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java → ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertLifecycleListener.java

@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.server.events.listeners;
+package org.apache.ambari.server.events.listeners.alerts;
 
 import java.util.Set;
 
 import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
+import org.apache.ambari.server.events.AlertHashInvalidationEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
@@ -61,6 +62,12 @@ public class AlertLifecycleListener {
   @Inject
   private Provider<AlertDefinitionHash> m_alertDefinitionHash;
 
+  /**
+   * Used to publish events when an alert definition has a lifecycle event.
+   */
+  @Inject
+  private AmbariEventPublisher m_eventPublisher;
+
   /**
    * Constructor.
    *
@@ -118,10 +125,12 @@ public class AlertLifecycleListener {
     m_aggregateMapping.removeAssociatedAggregate(event.getClusterId(),
         definition.getName());
 
+    // invalidate and publish
     AlertDefinitionHash hashHelper = m_alertDefinitionHash.get();
     Set<String> invalidatedHosts = hashHelper.invalidateHosts(definition);
+    AlertHashInvalidationEvent hashInvalidationEvent = new AlertHashInvalidationEvent(
+        definition.getClusterId(), invalidatedHosts);
 
-    hashHelper.enqueueAgentCommands(definition.getClusterId(),
-        invalidatedHosts);
+    m_eventPublisher.publish(hashInvalidationEvent);
   }
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertMaintenanceModeListener.java → ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertMaintenanceModeListener.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.server.events.listeners;
+package org.apache.ambari.server.events.listeners.alerts;
 
 import java.util.List;
 

+ 140 - 3
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java → ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java

@@ -15,9 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.server.events.listeners;
+package org.apache.ambari.server.events.listeners.alerts;
 
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
 import org.apache.ambari.server.events.AlertEvent;
 import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.AlertStateChangeEvent;
@@ -29,13 +34,19 @@ import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 
 /**
@@ -57,6 +68,12 @@ public class AlertReceivedListener {
   @Inject
   private AlertDefinitionDAO m_definitionDao;
 
+  /**
+   * Used for looking up whether an alert has a valid service/component/host
+   */
+  @Inject
+  private Provider<Clusters> m_clusters;
+
   /**
    * Receives and publishes {@link AlertEvent} instances.
    */
@@ -100,6 +117,22 @@ public class AlertReceivedListener {
       return;
     }
 
+    // it's possible that a definition which is disabled will still have a
+    // running alert returned; this will ensure we don't record it
+    if (!definition.getEnabled()) {
+      LOG.debug(
+          "Received an alert for {} which is disabled. No more alerts should be received for this definition.",
+          alert.getName());
+
+      return;
+    }
+
+    // jobs that were running when a service/component/host was changed
+    // which invalidate the alert should not be reported
+    if (!isValid(alert)) {
+      return;
+    }
+
     AlertCurrentEntity current = null;
 
     if (null == alert.getHost() || definition.isHostIgnored()) {
@@ -115,13 +148,13 @@ public class AlertReceivedListener {
       current = new AlertCurrentEntity();
       current.setMaintenanceState(MaintenanceState.OFF);
       current.setAlertHistory(history);
-      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setLatestTimestamp(alert.getTimestamp());
       current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
 
       m_alertsDao.create(current);
 
     } else if (alert.getState() == current.getAlertHistory().getAlertState()) {
-      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setLatestTimestamp(alert.getTimestamp());
       current.setLatestText(alert.getText());
 
       current = m_alertsDao.merge(current);
@@ -164,6 +197,110 @@ public class AlertReceivedListener {
     }
   }
 
+  /**
+   * Gets whether the specified alert is valid for its reported cluster,
+   * service, component, and host. This method is necessary for the case where a
+   * component has been removed from a host, but the alert data is going to be
+   * returned before the agent alert job can be unscheduled.
+   *
+   * @param alert
+   *          the alert.
+   * @return {@code true} if the alert is for a valid combination of
+   *         cluster/service/component/host.
+   */
+  private boolean isValid(Alert alert) {
+    String clusterName = alert.getCluster();
+    String serviceName = alert.getService();
+    String componentName = alert.getComponent();
+    String hostName = alert.getHost();
+
+    // if the alert is not bound to a cluster, then it's most likely a
+    // host alert and is always valid
+    if( null == clusterName ){
+      return true;
+    }
+
+    // AMBARI is always a valid service
+    String ambariServiceName = Services.AMBARI.name();
+    if (ambariServiceName.equals(serviceName)) {
+      return true;
+    }
+
+    final Cluster cluster;
+    try {
+      cluster = m_clusters.get().getCluster(clusterName);
+      if (null == cluster) {
+        LOG.error("Unable to process alert {} for an invalid cluster named {}",
+            alert.getName(), clusterName);
+
+        return false;
+      }
+    } catch (AmbariException ambariException) {
+      LOG.error("Unable to process alert {} for an invalid cluster named {}",
+          alert.getName(), clusterName, ambariException);
+
+      return false;
+    }
+
+    Map<String, Service> services = cluster.getServices();
+    Service service = services.get(serviceName);
+    if (null == service) {
+      LOG.error("Unable to process alert {} for an invalid service named {}",
+          alert.getName(), serviceName);
+
+      return false;
+    }
+
+    if (null != hostName) {
+      List<Host> hosts = m_clusters.get().getHosts();
+      if (null == hosts) {
+        LOG.error("Unable to process alert {} for an invalid host named {}",
+            alert.getName(), hostName);
+
+        return false;
+      }
+
+      boolean validHost = false;
+      for (Host host : hosts) {
+        if (hostName.equals(host.getHostName())) {
+          validHost = true;
+          break;
+        }
+      }
+
+      if (!validHost) {
+        LOG.error("Unable to process alert {} for an invalid host named {}",
+            alert.getName(), hostName);
+
+        return false;
+      }
+    }
+
+    // if the alert is for a host/component then verify that the component
+    // is actually installed on that host
+    if (null != hostName && null != componentName) {
+      boolean validServiceComponentHost = false;
+      List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostName);
+
+      for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
+        if (componentName.equals(serviceComponentHost.getServiceComponentName())) {
+          validServiceComponentHost = true;
+          break;
+        }
+      }
+
+      if (!validServiceComponentHost) {
+        LOG.warn(
+            "Unable to process alert {} for an invalid service {} and component {} on host {}",
+            alert.getName(), serviceName, componentName, hostName);
+
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   /**
    * Convenience to create a new alert.
    *

+ 67 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceComponentHostListener.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+
+/**
+ * The {@link AlertServiceComponentHostListener} handles event relating to the
+ * disabling of an alert definition.
+ */
+@EagerSingleton
+public class AlertServiceComponentHostListener {
+
+  /**
+   * Used for deleting the alert notices when a definition is disabled.
+   */
+  @Inject
+  private AlertsDAO m_alertsDao = null;
+
+  /**
+   * Constructor.
+   *
+   * @param publisher
+   */
+  @Inject
+  public AlertServiceComponentHostListener(AmbariEventPublisher publisher) {
+    publisher.register(this);
+  }
+
+  /**
+   * Removes any {@link AlertCurrentEntity} for the given service, component and
+   * host.
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onEvent(ServiceComponentUninstalledEvent event) {
+    String serviceName = event.getServiceName();
+    String componentName = event.getComponentName();
+    String hostName = event.getHostName();
+
+    m_alertsDao.removeCurrentByServiceComponentHost(serviceName, componentName,
+        hostName);
+  }
+}

+ 36 - 3
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java → ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.server.events.listeners;
+package org.apache.ambari.server.events.listeners.alerts;
 
 import java.text.MessageFormat;
 import java.util.Set;
@@ -26,10 +26,12 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.ControllerModule;
 import org.apache.ambari.server.events.ServiceInstalledEvent;
+import org.apache.ambari.server.events.ServiceRemovedEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertGroupEntity;
 import org.apache.ambari.server.state.alert.AlertDefinition;
@@ -45,8 +47,9 @@ import com.google.inject.Singleton;
 
 /**
  * The {@link AlertServiceStateListener} class handles
- * {@link ServiceInstalledEvent} and ensures that {@link AlertDefinitionEntity}
- * and {@link AlertGroupEntity} instances are correctly populated.
+ * {@link ServiceInstalledEvent} and {@link ServiceRemovedEvent} and ensures
+ * that {@link AlertDefinitionEntity} and {@link AlertGroupEntity} instances are
+ * correctly populated or cleaned up.
  */
 @Singleton
 @EagerSingleton
@@ -86,6 +89,12 @@ public class AlertServiceStateListener {
   @Inject
   private AlertDefinitionDAO m_definitionDao;
 
+  /**
+   * Used for removing current alerts when a service is removed.
+   */
+  @Inject
+  private AlertsDAO m_alertsDao;
+
   /**
    * Constructor.
    *
@@ -147,4 +156,28 @@ public class AlertServiceStateListener {
       LOG.error(message, ae);
     }
   }
+
+  /**
+   * Removes any current alerts associated with the specified service and the
+   * service's default alert group.
+   *
+   * @param event
+   *          the published event being handled (not {@code null}).
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onAmbariEvent(ServiceRemovedEvent event) {
+    LOG.debug(event);
+
+    // remove any current alerts
+    m_alertsDao.removeCurrentByService(event.getServiceName());
+
+    // remove the default group for the service
+    AlertGroupEntity group = m_alertDispatchDao.findGroupByName(
+        event.getClusterId(), event.getServiceName());
+
+    if (null != group && group.isDefault()) {
+      m_alertDispatchDao.remove(group);
+    }
+  }
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java → ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertStateChangedListener.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.server.events.listeners;
+package org.apache.ambari.server.events.listeners.alerts;
 
 import java.util.List;
 import java.util.Set;

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java

@@ -29,7 +29,7 @@ import com.google.inject.Singleton;
  * single-threaded, serial {@link EventBus}.
  */
 @Singleton
-public final class AmbariEventPublisher {
+public class AmbariEventPublisher {
 
   /**
    * A single threaded event bus for processing Ambari events in serial.

+ 81 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java

@@ -435,6 +435,87 @@ public class AlertsDAO {
     return query.executeUpdate();
   }
 
+  /**
+   * Remove all current alerts that are disabled.
+   *
+   * @return the number of alerts removed.
+   */
+  @Transactional
+  public int removeCurrentDisabledAlerts() {
+    TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+        "AlertCurrentEntity.removeDisabled", AlertCurrentEntity.class);
+
+    return query.executeUpdate();
+  }
+
+  /**
+   * Remove the current alert that matches the given service. This is used in
+   * cases where the service was removed from the cluster.
+   *
+   * @param serviceName
+   *          the name of the service that the current alerts are being removed
+   *          for (not {@code null}).
+   * @return the number of alerts removed.
+   */
+  @Transactional
+  public int removeCurrentByService(String serviceName) {
+
+    TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+        "AlertCurrentEntity.removeByService", AlertCurrentEntity.class);
+
+    query.setParameter("serviceName", serviceName);
+    return query.executeUpdate();
+  }
+
+  /**
+   * Remove the current alert that matches the given host. This is used in cases
+   * where the host was removed from the cluster.
+   *
+   * @param hostName
+   *          the name of the host that the current alerts are being removed for
+   *          (not {@code null}).
+   * @return the number of alerts removed.
+   */
+  @Transactional
+  public int removeCurrentByHost(String hostName) {
+
+    TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+        "AlertCurrentEntity.removeByHost", AlertCurrentEntity.class);
+
+    query.setParameter("hostName", hostName);
+    return query.executeUpdate();
+  }
+
+  /**
+   * Remove the current alert that matches the given service, component and
+   * host. This is used in cases where the component was removed from the host.
+   *
+   * @param serviceName
+   *          the name of the service that the current alerts are being removed
+   *          for (not {@code null}).
+   * @param componentName
+   *          the name of the component that the current alerts are being
+   *          removed for (not {@code null}).
+   * @param hostName
+   *          the name of the host that the current alerts are being removed for
+   *          (not {@code null}).
+   * @return the number of alerts removed.
+   */
+  @Transactional
+  public int removeCurrentByServiceComponentHost(String serviceName,
+      String componentName,
+      String hostName) {
+
+    TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+        "AlertCurrentEntity.removeByHostComponent", AlertCurrentEntity.class);
+
+    query.setParameter("serviceName", serviceName);
+    query.setParameter("componentName", componentName);
+    query.setParameter("hostName", hostName);
+
+    return query.executeUpdate();
+  }
+
   /**
    * Persists a new alert.
    *

+ 5 - 1
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java

@@ -53,7 +53,11 @@ import org.apache.ambari.server.state.MaintenanceState;
     @NamedQuery(name = "AlertCurrentEntity.findByHostAndName", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.alertDefinition.definitionName = :definitionName AND alert.alertHistory.hostName = :hostName"),
     @NamedQuery(name = "AlertCurrentEntity.findByNameAndNoHost", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.alertDefinition.definitionName = :definitionName AND alert.alertHistory.hostName IS NULL"),
     @NamedQuery(name = "AlertCurrentEntity.removeByHistoryId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.alertId = :historyId"),
-    @NamedQuery(name = "AlertCurrentEntity.removeByDefinitionId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.definitionId = :definitionId") })
+    @NamedQuery(name = "AlertCurrentEntity.removeByDefinitionId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.definitionId = :definitionId"),
+    @NamedQuery(name = "AlertCurrentEntity.removeDisabled", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.enabled = 0"),
+    @NamedQuery(name = "AlertCurrentEntity.removeByService", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.serviceName = :serviceName"),
+    @NamedQuery(name = "AlertCurrentEntity.removeByHost", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.hostName = :hostName"),
+    @NamedQuery(name = "AlertCurrentEntity.removeByHostComponent", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.serviceName = :serviceName AND alert.alertHistory.componentName = :componentName AND alert.alertHistory.hostName = :hostName") })
 public class AlertCurrentEntity {
 
   @Id

+ 6 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java

@@ -178,10 +178,16 @@ public class Alert {
   /**
    * @return
    */
+  @JsonProperty("cluster")
   public String getCluster() {
     return cluster;
   }
 
+  @JsonProperty("cluster")
+  public void setCluster(String cluster){
+    this.cluster = cluster;
+  }
+
   @Override
   public int hashCode() {
     int result = alertHashCode();

+ 9 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java

@@ -29,6 +29,7 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.ServiceResponse;
 import org.apache.ambari.server.events.MaintenanceModeEvent;
 import org.apache.ambari.server.events.ServiceInstalledEvent;
+import org.apache.ambari.server.events.ServiceRemovedEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
@@ -699,6 +700,14 @@ public class ServiceImpl implements Service {
         if (persisted) {
           removeEntities();
           persisted = false;
+
+          // publish the service removed event
+          StackId stackId = cluster.getDesiredStackVersion();
+
+          ServiceRemovedEvent event = new ServiceRemovedEvent(getClusterId(),
+              stackId.getStackName(), stackId.getStackVersion(), getName());
+
+          eventPublisher.publish(event);
         }
       } finally {
         readWriteLock.writeLock().unlock();

+ 51 - 56
ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java

@@ -20,6 +20,7 @@ package org.apache.ambari.server.state.alert;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,8 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.ActionQueue;
@@ -80,6 +80,9 @@ public class AlertDefinitionHash {
   @Inject
   private AlertDefinitionDAO m_definitionDao;
 
+  /**
+   * Used to coerce {@link AlertDefinitionEntity} into {@link AlertDefinition}.
+   */
   @Inject
   private AlertDefinitionFactory m_factory;
 
@@ -89,6 +92,10 @@ public class AlertDefinitionHash {
   @Inject
   private Provider<Clusters> m_clusters;
 
+  /**
+   * Used to enqueue {@link AlertDefinitionCommand} on the heartbeat response
+   * queue.
+   */
   @Inject
   private ActionQueue m_actionQueue;
 
@@ -101,9 +108,11 @@ public class AlertDefinitionHash {
   private Provider<ConfigHelper> m_configHelper;
 
   /**
-   * !!! TODO: this class needs some thoughts on locking
+   * Due to the nature of the asynchronous events for alerts and Ambari, this
+   * lock will ensure that only a single writer is writing to the
+   * {@link ActionQueue}.
    */
-  private ReadWriteLock m_lock = new ReentrantReadWriteLock();
+  private ReentrantLock m_actionQueueLock = new ReentrantLock();
 
   /**
    * The hashes for all hosts for any cluster. The key is the hostname and the
@@ -145,33 +154,6 @@ public class AlertDefinitionHash {
     return hash;
   }
 
-  /**
-   * Gets a mapping between cluster and alert definition hashes for all of the
-   * clusters that the given host belongs to.
-   *
-   * @param hostName
-   *          the host name (not {@code null}).
-   * @return a mapping between cluster and alert definition hash or an empty map
-   *         (never @code null).
-   * @see #getHash(String, String)
-   * @throws AmbariException
-   */
-  public Map<String, String> getHashes(String hostName) throws AmbariException {
-    Set<Cluster> clusters = m_clusters.get().getClustersForHost(hostName);
-    if (null == clusters || clusters.size() == 0) {
-      return Collections.emptyMap();
-    }
-
-    Map<String, String> hashes = new HashMap<String, String>();
-    for (Cluster cluster : clusters) {
-      String clusterName = cluster.getClusterName();
-      String hash = getHash(clusterName, hostName);
-      hashes.put(clusterName, hash);
-    }
-
-    return hashes;
-  }
-
   /**
    * Invalidate all cached hashes causing subsequent lookups to recalculate.
    */
@@ -207,7 +189,7 @@ public class AlertDefinitionHash {
   }
 
   /**
-   * Gets whether the alert definition has for the specified host has been
+   * Gets whether the alert definition hash for the specified host has been
    * calculated and cached.
    *
    * @param hostName
@@ -314,7 +296,7 @@ public class AlertDefinitionHash {
    * @return the hosts that were invalidated, or an empty set (never
    *         {@code null}).
    */
-  public Set<String> invalidateHosts(long clusterId,
+  private Set<String> invalidateHosts(long clusterId,
       SourceType definitionSourceType, String definitionName,
       String definitionServiceName, String definitionComponentName) {
 
@@ -388,9 +370,12 @@ public class AlertDefinitionHash {
       return affectedHosts;
     }
 
+    String ambariServiceName = Services.AMBARI.name();
+    String agentComponentName = Components.AMBARI_AGENT.name();
+
     // intercept host agent alerts; they affect all hosts
-    if (Services.AMBARI.equals(definitionServiceName)
-        && Components.AMBARI_AGENT.equals(definitionComponentName)) {
+    if (ambariServiceName.equals(definitionServiceName)
+        && agentComponentName.equals(definitionComponentName)) {
       affectedHosts.addAll(hosts.keySet());
       return affectedHosts;
     }
@@ -456,7 +441,7 @@ public class AlertDefinitionHash {
    * @param hosts
    *          the hosts to push {@link AlertDefinitionCommand}s for.
    */
-  public void enqueueAgentCommands(long clusterId, Set<String> hosts) {
+  public void enqueueAgentCommands(long clusterId, Collection<String> hosts) {
     String clusterName = null;
 
     try {
@@ -483,7 +468,7 @@ public class AlertDefinitionHash {
    * @param hosts
    *          the hosts to push {@link AlertDefinitionCommand}s for.
    */
-  public void enqueueAgentCommands(String clusterName, Set<String> hosts) {
+  private void enqueueAgentCommands(String clusterName, Collection<String> hosts) {
     if (null == clusterName) {
       LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
       return;
@@ -493,29 +478,39 @@ public class AlertDefinitionHash {
       return;
     }
 
-    for (String hostName : hosts) {
-      List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
-          hostName);
+    try {
+      m_actionQueueLock.lock();
+      for (String hostName : hosts) {
+        List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
+            hostName);
 
-      String hash = getHash(clusterName, hostName);
+        String hash = getHash(clusterName, hostName);
 
-      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
-          hostName, hash, definitions);
+        AlertDefinitionCommand command = new AlertDefinitionCommand(
+            clusterName, hostName, hash, definitions);
 
-      try {
-        Cluster cluster = m_clusters.get().getCluster(clusterName);
-        command.addConfigs(m_configHelper.get(), cluster);
-      } catch (AmbariException ae) {
-        LOG.warn("Unable to add configurations to alert definition command", ae);
-      }
+        try {
+          Cluster cluster = m_clusters.get().getCluster(clusterName);
+          command.addConfigs(m_configHelper.get(), cluster);
+        } catch (AmbariException ae) {
+          LOG.warn("Unable to add configurations to alert definition command",
+              ae);
+        }
+
+        // unlike other commands, the alert definitions commands are really
+        // designed to be 1:1 per change; if multiple invalidations happened
+        // before the next heartbeat, there would be several commands that would
+        // force the agents to reschedule their alerts more than once
+        m_actionQueue.dequeue(hostName,
+            AgentCommandType.ALERT_DEFINITION_COMMAND);
 
-      // unlike other commands, the alert definitions commands are really
-      // designed to be 1:1 per change; if multiple invalidations happened
-      // before the next heartbeat, there would be several commands that would
-      // force the agents to reschedule their alerts more than once
-      m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
-      m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_EXECUTION_COMMAND);
-      m_actionQueue.enqueue(hostName, command);
+        m_actionQueue.dequeue(hostName,
+            AgentCommandType.ALERT_EXECUTION_COMMAND);
+
+        m_actionQueue.enqueue(hostName, command);
+      }
+    } finally {
+      m_actionQueueLock.unlock();
     }
   }
 

+ 29 - 12
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java

@@ -39,6 +39,8 @@ import org.apache.ambari.server.HostNotFoundException;
 import org.apache.ambari.server.agent.DiskInfo;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.events.HostRemovedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
 import org.apache.ambari.server.orm.dao.ConfigGroupHostMappingDAO;
@@ -123,6 +125,12 @@ public class ClustersImpl implements Clusters {
   @Inject
   private SecurityHelper securityHelper;
 
+  /**
+   * Used to publish events relating to cluster CRUD operations.
+   */
+  @Inject
+  private AmbariEventPublisher eventPublisher;
+
   @Inject
   public ClustersImpl() {
     clusters = new ConcurrentHashMap<String, Cluster>();
@@ -417,10 +425,11 @@ public class ClustersImpl implements Clusters {
     r.lock();
     try {
       for (String host : hostSet) {
-        if (!hosts.containsKey(host))
+        if (!hosts.containsKey(host)) {
           throw new HostNotFoundException(host);
-        else
+        } else {
           hostMap.put(host, hosts.get(host));
+        }
       }
     } finally {
       r.unlock();
@@ -437,10 +446,11 @@ public class ClustersImpl implements Clusters {
     try {
       for (String c : clusterSet) {
         if (c != null) {
-          if (!clusters.containsKey(c))
+          if (!clusters.containsKey(c)) {
             throw new ClusterNotFoundException(c);
-          else
+          } else {
             clusterMap.put(c, clusters.get(c));
+          }
         }
       }
     } finally {
@@ -574,6 +584,7 @@ public class ClustersImpl implements Clusters {
   }
 
 
+  @Override
   public void debugDump(StringBuilder sb) {
     r.lock();
     try {
@@ -645,13 +656,13 @@ public class ClustersImpl implements Clusters {
       w.unlock();
     }
   }
-  
+
   @Override
   public void unmapHostFromCluster(String hostname, String clusterName)
       throws AmbariException {
 
     checkLoaded();
-    
+
     w.lock();
 
     try {
@@ -679,9 +690,9 @@ public class ClustersImpl implements Clusters {
     } finally {
       w.unlock();
     }
-    
+
   }
-  
+
   @Transactional
   private void unmapHostClusterEntities(String hostName, long clusterId) {
     HostEntity hostEntity = hostDAO.findByName(hostName);
@@ -703,14 +714,15 @@ public class ClustersImpl implements Clusters {
       }
     }
   }
-  
+
   @Override
   public void deleteHost(String hostname) throws AmbariException {
     checkLoaded();
 
-    if (!hosts.containsKey(hostname))
+    if (!hosts.containsKey(hostname)) {
       return;
-    
+    }
+
     w.lock();
 
     try {
@@ -725,12 +737,17 @@ public class ClustersImpl implements Clusters {
       hostDAO.refresh(entity);
       hostDAO.remove(entity);
       hosts.remove(hostname);
+
+      // publish the event
+      HostRemovedEvent event = new HostRemovedEvent(hostname);
+      eventPublisher.publish(event);
+
     } catch (Exception e) {
       throw new AmbariException("Could not remove host", e);
     } finally {
       w.unlock();
     }
-    
+
   }
 
   @Override

+ 34 - 7
ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java

@@ -31,7 +31,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.AlertDefinitionCommand;
 import org.apache.ambari.server.controller.ServiceComponentHostResponse;
+import org.apache.ambari.server.events.AlertHashInvalidationEvent;
 import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
@@ -46,13 +48,13 @@ import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
 import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntityPK;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.SecurityState;
 import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostComponentAdminState;
 import org.apache.ambari.server.state.HostConfig;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.SecurityState;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
@@ -561,12 +563,17 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
         return;
       }
 
+      // invalidate the host
       String hostName = impl.getHostName();
       impl.alertDefinitionHash.invalidate(impl.getClusterName(), hostName);
-      impl.alertDefinitionHash.enqueueAgentCommands(impl.getClusterName(),
-          Collections.singleton(hostName));
 
+      // publish the event
+      AlertHashInvalidationEvent hashInvalidationEvent = new AlertHashInvalidationEvent(
+          impl.getClusterId(), Collections.singletonList(hostName));
+
+      impl.eventPublisher.publish(hashInvalidationEvent);
       impl.updateLastOpInfo(event.getType(), event.getOpTimestamp());
+
     }
   }
 
@@ -819,8 +826,9 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
 
   @Override
   public void setDesiredSecurityState(SecurityState securityState) throws AmbariException {
-    if(!securityState.isEndpoint())
+    if(!securityState.isEndpoint()) {
       throw new AmbariException("The security state must be an endpoint state");
+    }
 
     clusterGlobalLock.readLock().lock();
     try {
@@ -1402,6 +1410,8 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
 
   @Override
   public void delete() {
+    boolean fireRemovalEvent = false;
+
     clusterGlobalLock.writeLock().lock();
     try {
       writeLock.lock();
@@ -1409,12 +1419,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
         if (persisted) {
           removeEntities();
           persisted = false;
+          fireRemovalEvent = true;
         }
+
         clusters.getCluster(getClusterName()).removeServiceComponentHost(this);
       } catch (AmbariException ex) {
-        if (LOG.isDebugEnabled()) {
-          LOG.error(ex.getMessage());
-        }
+        LOG.error("Unable to remove a service component from a host", ex);
       } finally {
         writeLock.unlock();
       }
@@ -1422,6 +1432,23 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
       clusterGlobalLock.writeLock().unlock();
     }
 
+    // publish event for the removal of the SCH after the removal is
+    // completed, but only if it was persisted
+    if (fireRemovalEvent) {
+      long clusterId = getClusterId();
+      StackId stackId = getStackVersion();
+      String stackVersion = stackId.getStackVersion();
+      String stackName = stackId.getStackName();
+      String serviceName = getServiceName();
+      String componentName = getServiceComponentName();
+      String hostName = getHostName();
+
+      ServiceComponentUninstalledEvent event = new ServiceComponentUninstalledEvent(
+          clusterId, stackName, stackVersion, serviceName, componentName,
+          hostName);
+
+      eventPublisher.publish(event);
+    }
   }
 
   @Transactional

+ 10 - 3
ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java

@@ -83,6 +83,7 @@ import org.apache.ambari.server.controller.internal.RequestResourceFilter;
 import org.apache.ambari.server.controller.internal.ServiceResourceProviderTest;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.customactions.ActionDefinition;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.metadata.ActionMetadata;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -93,7 +94,6 @@ import org.apache.ambari.server.security.authorization.Users;
 import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Config;
 import org.apache.ambari.server.state.ConfigFactory;
 import org.apache.ambari.server.state.ConfigHelper;
@@ -103,6 +103,7 @@ import org.apache.ambari.server.state.HostComponentAdminState;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.RepositoryInfo;
+import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentFactory;
@@ -123,6 +124,7 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEve
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -8383,8 +8385,8 @@ public class AmbariManagementControllerTest {
     controller.deleteHostComponents(schRequests);
     ServiceComponentHost sch = sc1.getServiceComponentHost(host1);
     assertTrue(sch.isRestartRequired());
-  }  
-  
+  }
+
   @Test
   public void testDeleteHost() throws Exception {
     String clusterName = "foo1";
@@ -8878,8 +8880,13 @@ public class AmbariManagementControllerTest {
         properties.setProperty(Configuration.OS_VERSION_KEY,
             "centos5");
         properties.setProperty(Configuration.SHARED_RESOURCES_DIR_KEY, "src/test/resources/");
+
         try {
           install(new ControllerModule(properties));
+
+          // ambari events interfere with the workflow of this test
+          bind(AmbariEventPublisher.class).toInstance(
+              EasyMock.createMock(AmbariEventPublisher.class));
         } catch (Exception e) {
           throw new RuntimeException(e);
         }

+ 0 - 2
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java

@@ -410,9 +410,7 @@ public class AlertDefinitionResourceProviderTest {
     Cluster cluster = createMock(Cluster.class);
     expect(amc.getClusters()).andReturn(clusters).atLeastOnce();
     expect(clusters.getCluster((String) anyObject())).andReturn(cluster).atLeastOnce();
-    expect(clusters.getClusterById(EasyMock.anyInt())).andReturn(cluster).atLeastOnce();
     expect(cluster.getClusterId()).andReturn(Long.valueOf(1)).atLeastOnce();
-    expect(cluster.getClusterName()).andReturn("c1").atLeastOnce();
 
     Capture<AlertDefinitionEntity> entityCapture = new Capture<AlertDefinitionEntity>();
     dao.create(capture(entityCapture));

+ 44 - 2
ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java

@@ -24,16 +24,15 @@ import java.util.Map;
 
 import junit.framework.Assert;
 
-import org.apache.ambari.server.events.MaintenanceModeEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentFactory;
@@ -124,6 +123,49 @@ public class EventsTest {
     m_injector = null;
   }
 
+  /**
+   * Tests that {@link ServiceInstalledEvent}s are fired correctly.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testServiceInstalledEvent() throws Exception {
+    Class<?> eventClass = ServiceInstalledEvent.class;
+    Assert.assertFalse(m_listener.isEventReceived(eventClass));
+    installHdfsService();
+    Assert.assertTrue(m_listener.isEventReceived(eventClass));
+  }
+
+  /**
+   * Tests that {@link ServiceRemovedEvent}s are fired correctly.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testServiceRemovedEvent() throws Exception {
+    Class<?> eventClass = ServiceRemovedEvent.class;
+    Assert.assertFalse(m_listener.isEventReceived(eventClass));
+    installHdfsService();
+    m_cluster.deleteAllServices();
+    Assert.assertTrue(m_listener.isEventReceived(eventClass));
+  }
+
+  /**
+   * Tests that {@link ServiceComponentUninstalledEvent}s are fired correctly.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testServiceComponentUninstalledEvent() throws Exception {
+    Class<?> eventClass = ServiceComponentUninstalledEvent.class;
+    installHdfsService();
+
+    Assert.assertFalse(m_listener.isEventReceived(eventClass));
+    m_cluster.getServiceComponentHosts(HOSTNAME).get(0).delete();
+
+    Assert.assertTrue(m_listener.isEventReceived(eventClass));
+  }
+
   /**
    * Tests that {@link MaintenanceModeEvent}s are fired correctly.
    *

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/events/MockEventListener.java

@@ -78,7 +78,7 @@ public class MockEventListener {
    * @param event
    */
   @Subscribe
-  public void onEvent(MaintenanceModeEvent event) {
+  public void onEvent(AmbariEvent event) {
     handleEvent(event);
   }
 

+ 69 - 2
ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java

@@ -47,7 +47,7 @@ import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.spi.SortRequest.Order;
 import org.apache.ambari.server.controller.spi.SortRequestProperty;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.events.listeners.AlertMaintenanceModeListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.AlertDaoHelper;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
@@ -58,11 +58,11 @@ import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
 import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.RepositoryVersionState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentFactory;
@@ -164,6 +164,7 @@ public class AlertsDAOTest {
         history.setAlertText(definition.getDefinitionName() + " " + i);
         history.setAlertTimestamp(calendar.getTimeInMillis());
         history.setHostName("h1");
+        history.setComponentName("Component " + i);
 
         history.setAlertState(AlertState.OK);
         if (i == 0 || i == 5) {
@@ -975,6 +976,72 @@ public class AlertsDAOTest {
     }
   }
 
+  @Test
+  public void testRemoveCurrenyByService() throws Exception {
+    List<AlertCurrentEntity> currentAlerts = m_dao.findCurrent();
+    assertNotNull(currentAlerts);
+    assertEquals(5, currentAlerts.size());
+
+    m_dao.removeCurrentByService("Service 1");
+    m_dao.removeCurrentByService("Service 2");
+
+    currentAlerts = m_dao.findCurrent();
+    assertEquals(3, currentAlerts.size());
+  }
+
+  @Test
+  public void testRemoveCurrenyByHost() throws Exception {
+    List<AlertCurrentEntity> currentAlerts = m_dao.findCurrent();
+    assertNotNull(currentAlerts);
+    assertEquals(5, currentAlerts.size());
+
+    // there is no h2 host
+    m_dao.removeCurrentByHost("h2");
+    currentAlerts = m_dao.findCurrent();
+    assertEquals(5, currentAlerts.size());
+
+    // there is an h1 host
+    m_dao.removeCurrentByHost("h1");
+    currentAlerts = m_dao.findCurrent();
+    assertEquals(0, currentAlerts.size());
+  }
+
+  @Test
+  public void testRemoveCurrenyByComponentHost() throws Exception {
+    List<AlertCurrentEntity> currentAlerts = m_dao.findCurrent();
+    assertNotNull(currentAlerts);
+    assertEquals(5, currentAlerts.size());
+
+    AlertCurrentEntity entity = m_dao.findCurrentByHostAndName(
+        m_clusterId.longValue(), "h1", "Alert Definition 1");
+
+    assertNotNull(entity);
+
+    m_dao.removeCurrentByServiceComponentHost(
+        entity.getAlertHistory().getServiceName(),
+        entity.getAlertHistory().getComponentName(),
+        entity.getAlertHistory().getHostName());
+
+    currentAlerts = m_dao.findCurrent();
+    assertEquals(4, currentAlerts.size());
+  }
+
+  @Test
+  public void testRemoveCurrentDisabled() throws Exception {
+    List<AlertCurrentEntity> currentAlerts = m_dao.findCurrent();
+    assertNotNull(currentAlerts);
+    assertEquals(5, currentAlerts.size());
+
+    AlertDefinitionEntity definition = currentAlerts.get(0).getAlertHistory().getAlertDefinition();
+    definition.setEnabled(false);
+    m_definitionDao.merge(definition);
+
+    m_dao.removeCurrentDisabledAlerts();
+
+    currentAlerts = m_dao.findCurrent();
+    assertEquals(4, currentAlerts.size());
+  }
+
   private Cluster initializeNewCluster() throws Exception {
     String clusterName = "cluster-" + System.currentTimeMillis();
     m_clusters.addCluster(clusterName);

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java

@@ -389,7 +389,7 @@ public class AlertDefinitionHashTest extends TestCase {
     hosts.add(HOSTNAME);
 
     // should invalidate both alert commands, and add a new definition command
-    m_hash.enqueueAgentCommands(CLUSTERNAME, hosts);
+    m_hash.enqueueAgentCommands(1L, hosts);
     assertEquals(1, actionQueue.size(HOSTNAME));
     assertEquals(1, actionQueue.size("anotherHost"));
   }

+ 18 - 3
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java

@@ -24,9 +24,9 @@ import junit.framework.Assert;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AmbariEvent;
-import org.apache.ambari.server.events.listeners.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -131,6 +131,21 @@ public class AlertEventPublisherTest {
     Assert.assertEquals(1, dispatchDao.findAllGroups().size());
   }
 
+  /**
+   * Tests that a default {@link AlertGroupEntity} is removed when a service is
+   * removed.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDefaultAlertGroupRemoved() throws Exception {
+    Assert.assertEquals(0, dispatchDao.findAllGroups().size());
+    installHdfsService();
+    Assert.assertEquals(1, dispatchDao.findAllGroups().size());
+    cluster.getService("HDFS").delete();
+    Assert.assertEquals(0, dispatchDao.findAllGroups().size());
+  }
+
   /**
    * Tests that all {@link AlertDefinitionEntity} instances are created for the
    * installed service.

+ 290 - 0
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertReceivedListenerTest.java

@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.alerts;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.OrmTestHelper;
+import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.alert.Scope;
+import org.apache.ambari.server.state.alert.SourceType;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.persist.PersistService;
+import com.google.inject.util.Modules;
+
+
+/**
+ * Tests the {@link AlertReceivedListener}.
+ */
+public class AlertReceivedListenerTest {
+
+  private static final String ALERT_DEFINITION = "alert_definition_";
+  private static final String CLUSTER_NAME = "c1";
+  private static final String SERVICE = "Service";
+  private static final String COMPONENT = "Component";
+  private static final String HOST1 = "h1";
+  private static final String ALERT_LABEL = "My Label";
+
+  private Long clusterId;
+  private Injector injector;
+  private OrmTestHelper helper;
+  private AlertsDAO dao;
+  private AlertDefinitionDAO definitionDao;
+
+  private Clusters clusters;
+  private Cluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    clusters = EasyMock.createNiceMock(Clusters.class);
+    cluster = EasyMock.createNiceMock(Cluster.class);
+
+    injector = Guice.createInjector(Modules.override(
+        new InMemoryDefaultTestModule()).with(new MockModule()));
+
+    injector.getInstance(GuiceJpaInitializer.class);
+    helper = injector.getInstance(OrmTestHelper.class);
+    clusterId = helper.createCluster();
+    dao = injector.getInstance(AlertsDAO.class);
+    definitionDao = injector.getInstance(AlertDefinitionDAO.class);
+
+    List<Host> hosts = new ArrayList<Host>();
+    Host host = EasyMock.createNiceMock(Host.class);
+    EasyMock.expect(host.getHostName()).andReturn(HOST1).anyTimes();
+    hosts.add(host);
+
+    Map<String,Service> services = new HashMap<String, Service>();
+    services.put("Service 1", EasyMock.createNiceMock(Service.class));
+
+    List<ServiceComponentHost> schs = new ArrayList<ServiceComponentHost>();
+    ServiceComponentHost sch = EasyMock.createNiceMock(ServiceComponentHost.class);
+    EasyMock.expect(sch.getServiceComponentName()).andReturn("Component 1").anyTimes();
+    schs.add(sch);
+
+    // setup isValid expectations
+    EasyMock.expect(clusters.getCluster(CLUSTER_NAME)).andReturn(cluster).anyTimes();
+    EasyMock.expect(clusters.getHosts()).andReturn(hosts).anyTimes();
+    EasyMock.expect(cluster.getServices()).andReturn(services).anyTimes();
+    EasyMock.expect(cluster.getServiceComponentHosts(HOST1)).andReturn(schs).anyTimes();
+
+    EasyMock.replay(clusters, cluster, sch, host);
+
+    // create 5 definitions
+    for (int i = 0; i < 5; i++) {
+      AlertDefinitionEntity definition = new AlertDefinitionEntity();
+      definition.setDefinitionName(ALERT_DEFINITION + i);
+      definition.setServiceName(SERVICE + " " + i);
+      definition.setComponentName(COMPONENT + " " + i);
+      definition.setClusterId(clusterId);
+      definition.setHash(UUID.randomUUID().toString());
+      definition.setScheduleInterval(Integer.valueOf(60));
+      definition.setScope(Scope.SERVICE);
+      definition.setSource("{\"type\" : \"SCRIPT\"}");
+      definition.setSourceType(SourceType.SCRIPT);
+      definitionDao.create(definition);
+    }
+  }
+
+  @After
+  public void teardown() {
+    injector.getInstance(PersistService.class).stop();
+    injector = null;
+  }
+
+  /**
+   * Tests that a disabled definition doesn't record alert events.
+   */
+  @Test
+  public void testDisabledAlert() {
+    String definitionName = ALERT_DEFINITION + "1";
+    String serviceName = "Service 1";
+    String componentName = "Component 1";
+
+    Alert alert1 = new Alert(definitionName, null, serviceName, componentName,
+        HOST1, AlertState.OK);
+
+    alert1.setCluster(CLUSTER_NAME);
+    alert1.setLabel(ALERT_LABEL);
+    alert1.setText(serviceName + " " + componentName + " is OK");
+    alert1.setTimestamp(1L);
+
+    // verify that the listener works with a regular alert
+    AlertReceivedListener listener = injector.getInstance(AlertReceivedListener.class);
+    AlertReceivedEvent event1 = new AlertReceivedEvent(clusterId, alert1);
+    listener.onAlertEvent(event1);
+
+    List<AlertCurrentEntity> allCurrent = dao.findCurrent();
+    assertEquals(1, allCurrent.size());
+
+    // disable definition
+    AlertDefinitionEntity definition = definitionDao.findByName(clusterId, definitionName);
+    definition.setEnabled(false);
+    definitionDao.merge(definition);
+
+    // remove disabled
+    dao.removeCurrentDisabledAlerts();
+    allCurrent = dao.findCurrent();
+    assertEquals(0, allCurrent.size());
+
+    // verify no new alerts for disabled
+    listener.onAlertEvent(event1);
+    allCurrent = dao.findCurrent();
+    assertEquals(0, allCurrent.size());
+  }
+
+  /**
+   * Tests an invalid host is being reported in an alert.
+   */
+  @Test
+  public void testInvalidHost() {
+    String definitionName = ALERT_DEFINITION + "1";
+    String serviceName = "Service 1";
+    String componentName = "Component 1";
+
+    Alert alert1 = new Alert(definitionName, null, serviceName, componentName,
+        HOST1, AlertState.OK);
+
+    alert1.setCluster(CLUSTER_NAME);
+    alert1.setLabel(ALERT_LABEL);
+    alert1.setText(serviceName + " " + componentName + " is OK");
+    alert1.setTimestamp(1L);
+
+    // verify that the listener works with a regular alert
+    AlertReceivedListener listener = injector.getInstance(AlertReceivedListener.class);
+    AlertReceivedEvent event1 = new AlertReceivedEvent(clusterId, alert1);
+    listener.onAlertEvent(event1);
+
+    List<AlertCurrentEntity> allCurrent = dao.findCurrent();
+    assertEquals(1, allCurrent.size());
+
+    // invalid host
+    alert1.setHost("INVALID");
+
+    // remove all
+    dao.removeCurrentByHost(HOST1);
+    allCurrent = dao.findCurrent();
+    assertEquals(0, allCurrent.size());
+
+    // verify no new alerts for disabled
+    listener.onAlertEvent(event1);
+    allCurrent = dao.findCurrent();
+    assertEquals(0, allCurrent.size());
+  }
+
+  /**
+   * Tests that a disabled definition doesn't record alert events.
+   */
+  @Test
+  public void testInvalidAlertDefinition() {
+    String serviceName = "Service 1";
+    String componentName = "Component 1";
+
+    Alert alert1 = new Alert("missing_alert_definition_name", null,
+        serviceName, componentName, HOST1, AlertState.OK);
+
+    alert1.setLabel(ALERT_LABEL);
+    alert1.setText(serviceName + " " + componentName + " is OK");
+    alert1.setTimestamp(1L);
+
+    // bad alert definition name means no current alerts
+    AlertReceivedListener listener = injector.getInstance(AlertReceivedListener.class);
+    AlertReceivedEvent event1 = new AlertReceivedEvent(clusterId, alert1);
+    listener.onAlertEvent(event1);
+
+    List<AlertCurrentEntity> allCurrent = dao.findCurrent();
+    assertEquals(0, allCurrent.size());
+  }
+
+  /**
+   * Tests an invalid pairing of component to host.
+   */
+  @Test
+  public void testInvalidServiceComponentHost() {
+    String definitionName = ALERT_DEFINITION + "1";
+    String serviceName = "Service 1";
+    String componentName = "Component 1";
+
+    Alert alert1 = new Alert(definitionName, null, serviceName, componentName,
+        HOST1, AlertState.OK);
+
+    alert1.setCluster(CLUSTER_NAME);
+    alert1.setLabel(ALERT_LABEL);
+    alert1.setText(serviceName + " " + componentName + " is OK");
+    alert1.setTimestamp(1L);
+
+    // verify that the listener works with a regular alert
+    AlertReceivedListener listener = injector.getInstance(AlertReceivedListener.class);
+    AlertReceivedEvent event1 = new AlertReceivedEvent(clusterId, alert1);
+    listener.onAlertEvent(event1);
+
+    List<AlertCurrentEntity> allCurrent = dao.findCurrent();
+    assertEquals(1, allCurrent.size());
+
+    // invalid host
+    alert1.setComponent("INVALID");
+
+    // remove all
+    dao.removeCurrentByHost(HOST1);
+    allCurrent = dao.findCurrent();
+    assertEquals(0, allCurrent.size());
+
+    // verify no new alerts for disabled
+    listener.onAlertEvent(event1);
+    allCurrent = dao.findCurrent();
+    assertEquals(0, allCurrent.size());
+  }
+
+  /**
+   *
+   */
+  private class MockModule implements Module {
+    @Override
+    public void configure(Binder binder) {
+      binder.bind(Clusters.class).toInstance(clusters);
+      binder.bind(Cluster.class).toInstance(cluster);
+    }
+  }
+}

+ 2 - 2
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java

@@ -25,8 +25,8 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.ambari.server.events.AlertStateChangeEvent;
-import org.apache.ambari.server.events.listeners.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;

+ 3 - 3
ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java

@@ -32,9 +32,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ambari.server.events.AlertEvent;
 import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.AlertStateChangeEvent;
-import org.apache.ambari.server.events.listeners.AlertAggregateListener;
-import org.apache.ambari.server.events.listeners.AlertReceivedListener;
-import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;