瀏覽代碼

AMBARI-7508 - Alerts: Reschedule Individual Alerts on Agents (jonathanhurley)

Jonathan Hurley 11 年之前
父節點
當前提交
7db82c923c
共有 26 個文件被更改,包括 755 次插入182 次删除
  1. 85 13
      ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
  2. 30 7
      ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
  3. 20 1
      ambari-agent/src/main/python/ambari_agent/alerts/collector.py
  4. 50 0
      ambari-agent/src/test/python/ambari_agent/TestAlerts.py
  5. 1 0
      ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json
  6. 4 3
      ambari-project/pom.xml
  7. 5 7
      ambari-server/pom.xml
  8. 19 14
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java
  9. 55 0
      ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java
  10. 6 1
      ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
  11. 53 1
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java
  12. 8 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
  13. 13 0
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
  14. 1 0
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
  15. 10 5
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
  16. 3 2
      ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java
  17. 27 4
      ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
  18. 19 0
      ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
  19. 1 0
      ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
  20. 177 116
      ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
  21. 87 0
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
  22. 3 3
      ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
  23. 1 0
      ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java
  24. 66 2
      ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
  25. 10 2
      ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
  26. 1 1
      ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java

+ 85 - 13
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -61,22 +61,25 @@ class AlertSchedulerHandler():
         logger.critical("Could not create the cache directory {0}".format(cachedir))
         logger.critical("Could not create the cache directory {0}".format(cachedir))
         pass
         pass
 
 
+    self._collector = AlertCollector()
     self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
     self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
     self.__in_minutes = in_minutes
     self.__in_minutes = in_minutes
-    self.__collector = AlertCollector()
     self.__config_maps = {}
     self.__config_maps = {}
+
           
           
-  def update_definitions(self, alert_commands, refresh_jobs=False):
+  def update_definitions(self, alert_commands, reschedule_jobs=False):
     ''' updates the persisted definitions and restarts the scheduler '''
     ''' updates the persisted definitions and restarts the scheduler '''
     
     
     with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
     with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
       json.dump(alert_commands, f, indent=2)
       json.dump(alert_commands, f, indent=2)
     
     
-    if refresh_jobs:
-      self.start()
+    if reschedule_jobs:
+      self.reschedule()
+
       
       
   def __make_function(self, alert_def):
   def __make_function(self, alert_def):
     return lambda: alert_def.collect()
     return lambda: alert_def.collect()
+
     
     
   def start(self):
   def start(self):
     ''' loads definitions from file and starts the scheduler '''
     ''' loads definitions from file and starts the scheduler '''
@@ -90,26 +93,72 @@ class AlertSchedulerHandler():
 
 
     alert_callables = self.__load_definitions()
     alert_callables = self.__load_definitions()
       
       
+    # schedule each definition
     for _callable in alert_callables:
     for _callable in alert_callables:
-      if self.__in_minutes:
-        self.__scheduler.add_interval_job(self.__make_function(_callable),
-          minutes=_callable.interval())
-      else:
-        self.__scheduler.add_interval_job(self.__make_function(_callable),
-          seconds=_callable.interval())
+      self.schedule_definition(_callable)
       
       
     logger.debug("Starting scheduler {0}; currently running: {1}".format(
     logger.debug("Starting scheduler {0}; currently running: {1}".format(
       str(self.__scheduler), str(self.__scheduler.running)))
       str(self.__scheduler), str(self.__scheduler.running)))
+
     self.__scheduler.start()
     self.__scheduler.start()
+
     
     
   def stop(self):
   def stop(self):
     if not self.__scheduler is None:
     if not self.__scheduler is None:
       self.__scheduler.shutdown(wait=False)
       self.__scheduler.shutdown(wait=False)
       self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
       self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
+
+
+  def reschedule(self):
+    '''
+    Removes jobs that are scheduled where their UUID no longer is valid. 
+    Schedules jobs where the definition UUID is not currently scheduled.
+    '''
+    jobs_scheduled = 0
+    jobs_removed = 0
+    
+    definitions = self.__load_definitions()
+    scheduled_jobs = self.__scheduler.get_jobs()
+    
+    # for every scheduled job, see if its UUID is still valid
+    for scheduled_job in scheduled_jobs:
+      uuid_valid = False
+      
+      for definition in definitions:
+        definition_uuid = definition.definition_uuid()
+        if scheduled_job.name == definition_uuid:
+          uuid_valid = True
+          break
+      
+      # jobs without valid UUIDs should be unscheduled
+      if uuid_valid == False:
+        jobs_removed += 1
+        logger.info("Unscheduling {0}".format(scheduled_job.name))
+        self._collector.remove_by_uuid(scheduled_job.name)
+        self.__scheduler.unschedule_job(scheduled_job)
       
       
+    # for every definition, determine if there is a scheduled job
+    for definition in definitions:
+      definition_scheduled = False
+      for scheduled_job in scheduled_jobs:
+        definition_uuid = definition.definition_uuid()
+        if definition_uuid == scheduled_job.name:
+          definition_scheduled = True
+          break
+      
+      # if no jobs are found with the definitions UUID, schedule it
+      if definition_scheduled == False:
+        jobs_scheduled += 1
+        self.schedule_definition(definition)
+  
+    logger.info("Alert Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
+        str(jobs_scheduled), str(jobs_removed)))
+
+
   def collector(self):
   def collector(self):
     ''' gets the collector for reporting to the server '''
     ''' gets the collector for reporting to the server '''
-    return self.__collector
+    return self._collector
+  
       
       
   def __load_definitions(self):
   def __load_definitions(self):
     ''' loads all alert commands from the file.  all clusters are stored in one file '''
     ''' loads all alert commands from the file.  all clusters are stored in one file '''
@@ -147,7 +196,7 @@ class AlertSchedulerHandler():
         vals = self.__find_config_values(configmap, obj.get_lookup_keys())
         vals = self.__find_config_values(configmap, obj.get_lookup_keys())
         self.__config_maps[clusterName].update(vals)
         self.__config_maps[clusterName].update(vals)
 
 
-        obj.set_helpers(self.__collector, self.__config_maps[clusterName])
+        obj.set_helpers(self._collector, self.__config_maps[clusterName])
 
 
         definitions.append(obj)
         definitions.append(obj)
       
       
@@ -208,7 +257,30 @@ class AlertSchedulerHandler():
         configmap = command['configurations']
         configmap = command['configurations']
         keylist = self.__config_maps[clusterName].keys()
         keylist = self.__config_maps[clusterName].keys()
         vals = self.__find_config_values(configmap, keylist)
         vals = self.__find_config_values(configmap, keylist)
-        self.__config_maps[clusterName].update(vals)  
+        self.__config_maps[clusterName].update(vals)
+        
+
+  def schedule_definition(self,definition):
+    '''
+    Schedule a definition (callable). Scheduled jobs are given the UUID
+    as their name so that they can be identified later on.
+    '''
+    job = None
+
+    if self.__in_minutes:
+      job = self.__scheduler.add_interval_job(self.__make_function(definition),
+        minutes=definition.interval())
+    else:
+      job = self.__scheduler.add_interval_job(self.__make_function(definition),
+        seconds=definition.interval())
+    
+    # although the documentation states that Job(kwargs) takes a name 
+    # key/value pair, it does not actually set the name; do it manually
+    if job is not None:
+      job.name = definition.definition_uuid()
+      
+    logger.info("Scheduling {0} with UUID {1}".format(
+      definition.definition_name(), definition.definition_uuid()))
 
 
 def main():
 def main():
   args = list(sys.argv)
   args = list(sys.argv)

+ 30 - 7
ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py

@@ -46,17 +46,34 @@ class BaseAlert(object):
     else:
     else:
       interval = self.alert_meta['interval']
       interval = self.alert_meta['interval']
       return 1 if interval < 1 else interval
       return 1 if interval < 1 else interval
-      
+
+
+  def definition_name(self):
+    '''
+    gets the unique name of the alert definition
+    '''
+    return self.alert_meta['name']
+
+
+  def definition_uuid(self):
+    '''
+    gets the unique has of the alert definition
+    '''
+    return self.alert_meta['uuid']
+
+
   def set_helpers(self, collector, value_dict):
   def set_helpers(self, collector, value_dict):
     ''' sets helper objects for alerts without having to use them in a constructor '''
     ''' sets helper objects for alerts without having to use them in a constructor '''
     self.collector = collector
     self.collector = collector
     self.config_value_dict = value_dict
     self.config_value_dict = value_dict
-      
+
+
   def set_cluster(self, cluster, host):
   def set_cluster(self, cluster, host):
     ''' sets cluster information for the alert '''
     ''' sets cluster information for the alert '''
     self.cluster = cluster
     self.cluster = cluster
     self.host_name = host
     self.host_name = host
-  
+
+
   def collect(self):
   def collect(self):
     ''' method used for collection.  defers to _collect() '''
     ''' method used for collection.  defers to _collect() '''
     
     
@@ -83,12 +100,14 @@ class BaseAlert(object):
     data['service'] = self._find_value('serviceName')
     data['service'] = self._find_value('serviceName')
     data['component'] = self._find_value('componentName')
     data['component'] = self._find_value('componentName')
     data['timestamp'] = long(time.time() * 1000)
     data['timestamp'] = long(time.time() * 1000)
+    data['uuid'] = self._find_value('uuid')
 
 
     if logger.isEnabledFor(logging.DEBUG):
     if logger.isEnabledFor(logging.DEBUG):
       logger.debug("debug alert text: {0}".format(data['text']))
       logger.debug("debug alert text: {0}".format(data['text']))
     
     
     self.collector.put(self.cluster, data)
     self.collector.put(self.cluster, data)
-  
+
+
   def _find_value(self, meta_key):
   def _find_value(self, meta_key):
     ''' safe way to get a value when outputting result json.  will not throw an exception '''
     ''' safe way to get a value when outputting result json.  will not throw an exception '''
     if self.alert_meta.has_key(meta_key):
     if self.alert_meta.has_key(meta_key):
@@ -96,10 +115,12 @@ class BaseAlert(object):
     else:
     else:
       return None
       return None
 
 
+
   def get_lookup_keys(self):
   def get_lookup_keys(self):
     ''' returns a list of lookup keys found for this alert '''
     ''' returns a list of lookup keys found for this alert '''
     return self._lookup_keys
     return self._lookup_keys
-      
+
+
   def _find_lookup_property(self, key):
   def _find_lookup_property(self, key):
     '''
     '''
     check if the supplied key is parameterized
     check if the supplied key is parameterized
@@ -112,7 +133,8 @@ class BaseAlert(object):
       return keys[0]
       return keys[0]
       
       
     return key
     return key
-    
+
+
   def _lookup_property_value(self, key):
   def _lookup_property_value(self, key):
     '''
     '''
     in the case of specifying a configuration path, lookup that path's value
     in the case of specifying a configuration path, lookup that path's value
@@ -124,7 +146,8 @@ class BaseAlert(object):
       return self.config_value_dict[key]
       return self.config_value_dict[key]
     else:
     else:
       return None
       return None
-  
+
+
   def _collect(self):
   def _collect(self):
     '''
     '''
     Low level function to collect alert data.  The result is a tuple as:
     Low level function to collect alert data.  The result is a tuple as:

+ 20 - 1
ambari-agent/src/main/python/ambari_agent/alerts/collector.py

@@ -33,8 +33,27 @@ class AlertCollector():
     if not cluster in self.__buckets:
     if not cluster in self.__buckets:
       self.__buckets[cluster] = {}
       self.__buckets[cluster] = {}
       
       
-    self.__buckets[cluster][alert['name']] = alert 
+    self.__buckets[cluster][alert['name']] = alert
     
     
+  def remove(self, cluster, alert_name):
+    '''
+    Removes the alert with the specified name if it exists in the dictionary
+    '''
+    if not cluster in self.__buckets:
+      return
+    
+    del self.__buckets[cluster][alert_name]
+    
+  def remove_by_uuid(self, alert_uuid):
+    '''
+    Removes the alert with the specified uuid if it exists in the dictionary
+    '''
+    for cluster,alert_map in self.__buckets.iteritems():
+      for alert_name in alert_map.keys():
+        alert = alert_map[alert_name]
+        if alert['uuid'] == alert_uuid:
+          self.remove(cluster, alert_name)
+        
   def alerts(self):
   def alerts(self):
     alerts = []
     alerts = []
     for clustermap in self.__buckets.values()[:]:
     for clustermap in self.__buckets.values()[:]:

+ 50 - 0
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -56,6 +56,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "label": "NameNode process",
       "interval": 6,
       "interval": 6,
       "scope": "host",
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
       "source": {
         "type": "PORT",
         "type": "PORT",
         "uri": "{{hdfs-site/my-key}}",
         "uri": "{{hdfs-site/my-key}}",
@@ -86,6 +87,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "label": "NameNode process",
       "interval": 6,
       "interval": 6,
       "scope": "host",
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
       "source": {
         "type": "PORT",
         "type": "PORT",
         "uri": "http://c6401.ambari.apache.org",
         "uri": "http://c6401.ambari.apache.org",
@@ -115,6 +117,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "label": "NameNode process",
       "interval": 6,
       "interval": 6,
       "scope": "host",
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
       "source": {
         "type": "SCRIPT",
         "type": "SCRIPT",
         "path": "test_script.py",
         "path": "test_script.py",
@@ -152,6 +155,7 @@ class TestAlerts(TestCase):
       "label": "NameNode process",
       "label": "NameNode process",
       "interval": 6,
       "interval": 6,
       "scope": "host",
       "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
       "source": {
       "source": {
         "type": "METRIC",
         "type": "METRIC",
         "uri": "http://myurl:8633",
         "uri": "http://myurl:8633",
@@ -197,3 +201,49 @@ class TestAlerts(TestCase):
     self.assertEquals('OK', collector.alerts()[0]['state'])
     self.assertEquals('OK', collector.alerts()[0]['state'])
     self.assertEquals('ok_arr: 1 3 None', collector.alerts()[0]['text'])
     self.assertEquals('ok_arr: 1 3 None', collector.alerts()[0]['text'])
     
     
+  def test_reschedule(self):
+    test_file_path = os.path.join('ambari_agent', 'dummy_files')
+    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path)
+    ash.start()
+    ash.reschedule()
+        
+  
+  def test_alert_collector_purge(self):
+    json = { "name": "namenode_process",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
+      "source": {
+        "type": "PORT",
+        "uri": "{{hdfs-site/my-key}}",
+        "default_port": 50070,
+        "reporting": {
+          "ok": {
+            "text": "TCP OK - {0:.4f} response time on port {1}"
+          },
+          "critical": {
+            "text": "Could not load process info: {0}"
+          }
+        }
+      }
+    }
+
+    collector = AlertCollector()
+
+    pa = PortAlert(json, json['source'])
+    pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'})
+    self.assertEquals(6, pa.interval())
+
+    res = pa.collect()
+    
+    self.assertIsNotNone(collector.alerts()[0])
+    self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
+    
+    collector.remove_by_uuid('c1f73191-4481-4435-8dae-fd380e4c0be1')
+    self.assertEquals(0,len(collector.alerts()))
+    

+ 1 - 0
ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json

@@ -16,6 +16,7 @@
         "label": "NameNode process",
         "label": "NameNode process",
         "interval": 6,
         "interval": 6,
         "scope": "host",
         "scope": "host",
+        "uuid": "3f82ae27-fa6a-465b-b77d-67963ac55d2f",
         "source": {
         "source": {
           "type": "PORT",
           "type": "PORT",
           "uri": "{{hdfs-site/dfs.namenode.http-address}}",
           "uri": "{{hdfs-site/dfs.namenode.http-address}}",

+ 4 - 3
ambari-project/pom.xml

@@ -55,8 +55,9 @@
   </pluginRepositories>
   </pluginRepositories>
   <repositories>
   <repositories>
     <repository>
     <repository>
-      <id>EclipseLink</id>
-      <url>http://download.eclipse.org/rt/eclipselink/maven.repo</url>
+      <id>oss.sonatype.org</id>
+      <name>OSS Sonatype Staging</name>
+      <url>https://oss.sonatype.org/content/groups/staging</url>
     </repository>
     </repository>
     <repository>
     <repository>
       <id>spring-milestones</id>
       <id>spring-milestones</id>
@@ -211,7 +212,7 @@
       <dependency>
       <dependency>
         <groupId>org.eclipse.persistence</groupId>
         <groupId>org.eclipse.persistence</groupId>
         <artifactId>eclipselink</artifactId>
         <artifactId>eclipselink</artifactId>
-        <version>2.4.0</version>
+        <version>2.4.2</version>
       </dependency>
       </dependency>
       <dependency>
       <dependency>
         <groupId>org.postgresql</groupId>
         <groupId>org.postgresql</groupId>

+ 5 - 7
ambari-server/pom.xml

@@ -191,7 +191,7 @@
           <dependency>
           <dependency>
             <groupId>org.eclipse.persistence</groupId>
             <groupId>org.eclipse.persistence</groupId>
             <artifactId>eclipselink</artifactId>
             <artifactId>eclipselink</artifactId>
-            <version>2.4.0</version>
+            <version>2.4.2</version>
           </dependency>
           </dependency>
         </dependencies>
         </dependencies>
       </plugin>
       </plugin>
@@ -1451,14 +1451,12 @@
       <version>1.5.2</version>
       <version>1.5.2</version>
     </dependency>
     </dependency>
   </dependencies>
   </dependencies>
-  <!--<reporting> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId>
-    <artifactId>findbugs-maven-plugin</artifactId> <version>2.5.2</version> </plugin>
-    </plugins> </reporting> -->
-
+  
   <pluginRepositories>
   <pluginRepositories>
     <pluginRepository>
     <pluginRepository>
-      <id>EclipseLink</id>
-      <url>http://download.eclipse.org/rt/eclipselink/maven.repo</url>
+      <id>oss.sonatype.org</id>
+      <name>OSS Sonatype Staging</name>
+      <url>https://oss.sonatype.org/content/groups/staging</url>
     </pluginRepository>
     </pluginRepository>
   </pluginRepositories>
   </pluginRepositories>
 
 

+ 19 - 14
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProvider.java

@@ -314,10 +314,6 @@ public class AlertGroupResourceProvider extends
       Collection<Long> targetIds = (Collection<Long>) requestMap.get(ALERT_GROUP_TARGETS);
       Collection<Long> targetIds = (Collection<Long>) requestMap.get(ALERT_GROUP_TARGETS);
       Collection<Long> definitionIds = (Collection<Long>) requestMap.get(ALERT_GROUP_DEFINITIONS);
       Collection<Long> definitionIds = (Collection<Long>) requestMap.get(ALERT_GROUP_DEFINITIONS);
 
 
-      if (!StringUtils.isBlank(name)) {
-        entity.setGroupName(name);
-      }
-
       // if targets were supplied, replace existing
       // if targets were supplied, replace existing
       Set<AlertTargetEntity> targets = new HashSet<AlertTargetEntity>();
       Set<AlertTargetEntity> targets = new HashSet<AlertTargetEntity>();
       if (null != targetIds && targetIds.size() > 0) {
       if (null != targetIds && targetIds.size() > 0) {
@@ -331,17 +327,26 @@ public class AlertGroupResourceProvider extends
         entity.setAlertTargets(targets);
         entity.setAlertTargets(targets);
       }
       }
 
 
-      // if definitions were supplied, replace existing
-      Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
-      if (null != definitionIds && definitionIds.size() > 0) {
-        List<Long> ids = new ArrayList<Long>(definitionIds.size());
-        ids.addAll(definitionIds);
-        definitions.addAll(s_definitionDao.findByIds(ids));
+      // only the targets should be updatable on default groups; everything
+      // else is valid only on regular groups
+      if (!entity.isDefault()) {
+        // set the name if supplied
+        if (!StringUtils.isBlank(name)) {
+          entity.setGroupName(name);
+        }
 
 
-        entity.setAlertDefinitions(definitions);
-      } else if (definitionIds.size() == 0) {
-        // empty array supplied, clear out existing definitions
-        entity.setAlertDefinitions(definitions);
+        // if definitions were supplied, replace existing
+        Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
+        if (null != definitionIds && definitionIds.size() > 0) {
+          List<Long> ids = new ArrayList<Long>(definitionIds.size());
+          ids.addAll(definitionIds);
+          definitions.addAll(s_definitionDao.findByIds(ids));
+
+          entity.setAlertDefinitions(definitions);
+        } else {
+          // empty array supplied, clear out existing definitions
+          entity.setAlertDefinitions(definitions);
+        }
       }
       }
 
 
       s_dao.merge(entity);
       s_dao.merge(entity);

+ 55 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionDeleteEvent.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.state.alert.AlertDefinition;
+
+/**
+ * The {@link AlertDefinitionDeleteEvent} is used to represent that an
+ * {@link AlertDefinition} has been removed from the system.
+ */
+public class AlertDefinitionDeleteEvent extends ClusterEvent {
+
+  /**
+   * The removed alert defintiion
+   */
+  private final AlertDefinition m_definition;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   *          the ID of the cluster that the definition is in.
+   * @param definition
+   *          the alert definition being registered.
+   */
+  public AlertDefinitionDeleteEvent(
+      long clusterId, AlertDefinition definition) {
+    super(AmbariEventType.ALERT_DEFINITION_REMOVAL, clusterId);
+    m_definition = definition;
+  }
+
+  /**
+   * Get the registered alert definition.
+   *
+   * @return the alert definition (not {@code null}).
+   */
+  public AlertDefinition getDefinition() {
+    return m_definition;
+  }
+}

+ 6 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java

@@ -35,7 +35,12 @@ public abstract class AmbariEvent {
     /**
     /**
      * An alert definition is registered with the system.
      * An alert definition is registered with the system.
      */
      */
-    ALERT_DEFINITION_REGISTRATION;
+    ALERT_DEFINITION_REGISTRATION,
+
+    /**
+     * An alert definition is removed from the system.
+     */
+    ALERT_DEFINITION_REMOVAL;
   }
   }
 
 
   /**
   /**

+ 53 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java

@@ -17,15 +17,22 @@
  */
  */
 package org.apache.ambari.server.events.listeners;
 package org.apache.ambari.server.events.listeners;
 
 
+import java.util.Set;
+
+import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
 import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
 import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.alert.SourceType;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import com.google.inject.Singleton;
 
 
 /**
 /**
@@ -34,6 +41,10 @@ import com.google.inject.Singleton;
  */
  */
 @Singleton
 @Singleton
 public class AlertLifecycleListener {
 public class AlertLifecycleListener {
+  /**
+   * Logger.
+   */
+  private static Logger LOG = LoggerFactory.getLogger(AlertLifecycleListener.class);
 
 
   /**
   /**
    * Used for quick lookups of aggregate alerts.
    * Used for quick lookups of aggregate alerts.
@@ -41,6 +52,13 @@ public class AlertLifecycleListener {
   @Inject
   @Inject
   private AggregateDefinitionMapping m_aggregateMapping;
   private AggregateDefinitionMapping m_aggregateMapping;
 
 
+  /**
+   * Invalidates hosts so that they can receive updated alert definition
+   * commands.
+   */
+  @Inject
+  private Provider<AlertDefinitionHash> m_alertDefinitionHash;
+
   /**
   /**
    * Constructor.
    * Constructor.
    *
    *
@@ -66,8 +84,42 @@ public class AlertLifecycleListener {
   public void onAmbariEvent(AlertDefinitionRegistrationEvent event) {
   public void onAmbariEvent(AlertDefinitionRegistrationEvent event) {
     AlertDefinition definition = event.getDefinition();
     AlertDefinition definition = event.getDefinition();
 
 
+    LOG.debug("Registering alert definition {}", definition);
+
     if (definition.getSource().getType() == SourceType.AGGREGATE) {
     if (definition.getSource().getType() == SourceType.AGGREGATE) {
-      m_aggregateMapping.addAggregateType(event.getClusterId(), definition);
+      m_aggregateMapping.registerAggregate(event.getClusterId(), definition);
+    }
+  }
+
+  /**
+   * Handles {@link AlertDefinitionDeleteEvent} by performing the following
+   * tasks:
+   * <ul>
+   * <li>Removal from with {@link AggregateDefinitionMapping}</li>
+   * <li>{@link AlertDefinitionHash} invalidation</li>
+   * </ul>
+   *
+   * @param event
+   *          the event being handled.
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onAmbariEvent(AlertDefinitionDeleteEvent event) {
+    AlertDefinition definition = event.getDefinition();
+
+    LOG.debug("Removing alert definition {}", definition);
+
+    if (null == definition) {
+      return;
     }
     }
+
+    m_aggregateMapping.removeAssociatedAggregate(event.getClusterId(),
+        definition.getName());
+
+    AlertDefinitionHash hashHelper = m_alertDefinitionHash.get();
+    Set<String> invalidatedHosts = hashHelper.invalidateHosts(definition);
+
+    hashHelper.enqueueAgentCommands(definition.getClusterId(),
+        invalidatedHosts);
   }
   }
 }
 }

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java

@@ -101,6 +101,14 @@ public class AlertReceivedListener {
       AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
       AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
           alert.getName());
           alert.getName());
 
 
+      if (null == definition) {
+        LOG.warn(
+            "Received an alert for {} which is a definition that does not exist anymore",
+            alert.getName());
+
+        return;
+      }
+
       AlertHistoryEntity history = createHistory(clusterId, definition, alert);
       AlertHistoryEntity history = createHistory(clusterId, definition, alert);
 
 
       current = new AlertCurrentEntity();
       current = new AlertCurrentEntity();

+ 13 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java

@@ -25,6 +25,7 @@ import javax.persistence.EntityManager;
 import javax.persistence.TypedQuery;
 import javax.persistence.TypedQuery;
 
 
 import org.apache.ambari.server.controller.RootServiceResponseFactory;
 import org.apache.ambari.server.controller.RootServiceResponseFactory;
+import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
@@ -364,6 +365,18 @@ public class AlertDefinitionDAO {
     if (null != alertDefinition) {
     if (null != alertDefinition) {
       entityManager.remove(alertDefinition);
       entityManager.remove(alertDefinition);
     }
     }
+
+    // publish the alert definition removal
+    AlertDefinition coerced = alertDefinitionFactory.coerce(alertDefinition);
+    if (null != coerced) {
+      AlertDefinitionDeleteEvent event = new AlertDefinitionDeleteEvent(
+          alertDefinition.getClusterId(), coerced);
+
+      eventPublisher.publish(event);
+    } else {
+      LOG.warn("Unable to broadcast alert removal event for {}",
+          alertDefinition.getDefinitionName());
+    }
   }
   }
 
 
   /**
   /**

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java

@@ -515,6 +515,7 @@ public class AlertsDAO {
     query.setParameter("clusterId", Long.valueOf(clusterId));
     query.setParameter("clusterId", Long.valueOf(clusterId));
     query.setParameter("definitionName", alertName);
     query.setParameter("definitionName", alertName);
 
 
+    query = setQueryRefreshHint(query);
     return daoUtils.selectOne(query);
     return daoUtils.selectOne(query);
   }
   }
 
 

+ 10 - 5
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java

@@ -17,7 +17,9 @@
  */
  */
 package org.apache.ambari.server.orm.entities;
 package org.apache.ambari.server.orm.entities;
 
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.Set;
 
 
 import javax.persistence.Basic;
 import javax.persistence.Basic;
@@ -363,7 +365,7 @@ public class AlertDefinitionEntity {
    * @return the groups, or {@code null} if none.
    * @return the groups, or {@code null} if none.
    */
    */
   public Set<AlertGroupEntity> getAlertGroups() {
   public Set<AlertGroupEntity> getAlertGroups() {
-    return alertGroups;
+    return Collections.unmodifiableSet(alertGroups);
   }
   }
 
 
   /**
   /**
@@ -418,7 +420,7 @@ public class AlertDefinitionEntity {
    * @param alertGroup
    * @param alertGroup
    */
    */
   protected void removeAlertGroup(AlertGroupEntity alertGroup) {
   protected void removeAlertGroup(AlertGroupEntity alertGroup) {
-    if (null != alertGroups) {
+    if (null != alertGroups && alertGroups.contains(alertGroup)) {
       alertGroups.remove(alertGroup);
       alertGroups.remove(alertGroup);
     }
     }
   }
   }
@@ -429,12 +431,15 @@ public class AlertDefinitionEntity {
    */
    */
   @PreRemove
   @PreRemove
   public void preRemove() {
   public void preRemove() {
-    Set<AlertGroupEntity> groups = getAlertGroups();
-    if (null == groups || groups.size() == 0) {
+    if (null == alertGroups || alertGroups.size() == 0) {
       return;
       return;
     }
     }
 
 
-    for (AlertGroupEntity group : groups) {
+    Iterator<AlertGroupEntity> iterator = alertGroups.iterator();
+    while (iterator.hasNext()) {
+      AlertGroupEntity group = iterator.next();
+      iterator.remove();
+
       group.removeAlertDefinition(this);
       group.removeAlertDefinition(this);
     }
     }
   }
   }

+ 3 - 2
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertGroupEntity.java

@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Set;
 
 
+import javax.persistence.CascadeType;
 import javax.persistence.Column;
 import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.Entity;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GeneratedValue;
@@ -73,14 +74,14 @@ public class AlertGroupEntity {
   /**
   /**
    * Bi-directional many-to-many association to {@link AlertDefinitionEntity}
    * Bi-directional many-to-many association to {@link AlertDefinitionEntity}
    */
    */
-  @ManyToMany
+  @ManyToMany(cascade = CascadeType.MERGE)
   @JoinTable(name = "alert_grouping", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "definition_id", nullable = false) })
   @JoinTable(name = "alert_grouping", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "definition_id", nullable = false) })
   private Set<AlertDefinitionEntity> alertDefinitions;
   private Set<AlertDefinitionEntity> alertDefinitions;
 
 
   /**
   /**
    * Unidirectional many-to-many association to {@link AlertTargetEntity}
    * Unidirectional many-to-many association to {@link AlertTargetEntity}
    */
    */
-  @ManyToMany
+  @ManyToMany(cascade = CascadeType.MERGE)
   @JoinTable(name = "alert_group_target", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "target_id", nullable = false) })
   @JoinTable(name = "alert_group_target", joinColumns = { @JoinColumn(name = "group_id", nullable = false) }, inverseJoinColumns = { @JoinColumn(name = "target_id", nullable = false) })
   private Set<AlertTargetEntity> alertTargets;
   private Set<AlertTargetEntity> alertTargets;
 
 

+ 27 - 4
ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java

@@ -19,6 +19,7 @@ package org.apache.ambari.server.state.alert;
 
 
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 
 import com.google.inject.Singleton;
 import com.google.inject.Singleton;
 
 
@@ -33,7 +34,8 @@ public final class AggregateDefinitionMapping {
    * In-memory mapping of cluster ID to definition name / aggregate definition.
    * In-memory mapping of cluster ID to definition name / aggregate definition.
    * This is used for fast lookups when receiving events.
    * This is used for fast lookups when receiving events.
    */
    */
-  private Map<Long, Map<String, AlertDefinition>> m_aggregateMap = new HashMap<Long, Map<String, AlertDefinition>>();
+  private Map<Long, Map<String, AlertDefinition>> m_aggregateMap =
+      new ConcurrentHashMap<Long, Map<String, AlertDefinition>>();
 
 
   /**
   /**
    * Constructor.
    * Constructor.
@@ -69,10 +71,10 @@ public final class AggregateDefinitionMapping {
    *
    *
    * @param clusterId
    * @param clusterId
    *          the ID of the cluster that the definition is bound to.
    *          the ID of the cluster that the definition is bound to.
-   * @param name
-   *          the unique name of the definition.
+   * @param definition
+   *          the aggregate definition to register (not {@code null}).
    */
    */
-  public void addAggregateType(long clusterId, AlertDefinition definition) {
+  public void registerAggregate(long clusterId, AlertDefinition definition) {
     Long id = Long.valueOf(clusterId);
     Long id = Long.valueOf(clusterId);
 
 
     if (!m_aggregateMap.containsKey(id)) {
     if (!m_aggregateMap.containsKey(id)) {
@@ -85,4 +87,25 @@ public final class AggregateDefinitionMapping {
 
 
     map.put(as.getAlertName(), definition);
     map.put(as.getAlertName(), definition);
   }
   }
+
+  /**
+   * Removes the associated aggregate for the specified aggregated definition.
+   *
+   * @param clusterId
+   *          the ID of the cluster that the definition is bound to.
+   * @param name
+   *          the unique name of the definition for which aggregates should be
+   *          unassociated (not {@code null}).
+   */
+  public void removeAssociatedAggregate(long clusterId,
+      String aggregatedDefinitonName) {
+    Long id = Long.valueOf(clusterId);
+
+    if (!m_aggregateMap.containsKey(id)) {
+      return;
+    }
+
+    Map<String, AlertDefinition> map = m_aggregateMap.get(id);
+    map.remove(aggregatedDefinitonName);
+  }
 }
 }

+ 19 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java

@@ -34,6 +34,7 @@ import java.util.HashSet;
  */
  */
 public class AlertDefinition {
 public class AlertDefinition {
 
 
+  private long clusterId;
   private String serviceName = null;
   private String serviceName = null;
   private String componentName = null;
   private String componentName = null;
 
 
@@ -45,6 +46,24 @@ public class AlertDefinition {
   private String label = null;
   private String label = null;
   private String uuid = null;
   private String uuid = null;
 
 
+  /**
+   * Gets the cluster ID for this definition.
+   *
+   * @return
+   */
+  public long getClusterId() {
+    return clusterId;
+  }
+
+  /**
+   * Sets the cluster ID for this definition.
+   *
+   * @param clusterId
+   */
+  public void setClusterId(long clusterId) {
+    this.clusterId = clusterId;
+  }
+
   /**
   /**
    * @return the service name
    * @return the service name
    */
    */

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java

@@ -125,6 +125,7 @@ public class AlertDefinitionFactory {
     }
     }
 
 
     AlertDefinition definition = new AlertDefinition();
     AlertDefinition definition = new AlertDefinition();
+    definition.setClusterId(entity.getClusterId());
     definition.setComponentName(entity.getComponentName());
     definition.setComponentName(entity.getComponentName());
     definition.setEnabled(entity.getEnabled());
     definition.setEnabled(entity.getEnabled());
     definition.setInterval(entity.getScheduleInterval());
     definition.setInterval(entity.getScheduleInterval());

+ 177 - 116
ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java

@@ -262,85 +262,19 @@ public class AlertDefinitionHash {
 
 
 
 
   /**
   /**
-   * Gets the alert definition entities for the specified host. This will include the
-   * following types of alert definitions:
-   * <ul>
-   * <li>Service/Component alerts</li>
-   * <li>Service alerts where the host is a MASTER</li>
-   * <li>Host alerts that are not bound to a service</li>
-   * </ul>
+   * Invalidate the hashes of any host that would be affected by the specified
+   * definition.
    *
    *
-   * @param clusterName
-   *          the cluster name (not {@code null}).
-   * @param hostName
-   *          the host name (not {@code null}).
-   * @return the alert definitions for the host, or an empty set (never
+   * @param definition
+   *          the definition to use to find the hosts to invlidate (not
+   *          {@code null}).
+   * @return the hosts that were invalidated, or an empty set (never
    *         {@code null}).
    *         {@code null}).
    */
    */
-  private Set<AlertDefinitionEntity> getAlertDefinitionEntities(
-      String clusterName,
-      String hostName) {
-    Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
-
-    try {
-      Cluster cluster = m_clusters.getCluster(clusterName);
-      if (null == cluster) {
-        LOG.warn("Unable to get alert definitions for the missing cluster {}",
-            clusterName);
-
-        return Collections.emptySet();
-      }
-
-      long clusterId = cluster.getClusterId();
-      List<ServiceComponentHost> serviceComponents = cluster.getServiceComponentHosts(hostName);
-      if (null == serviceComponents || serviceComponents.size() == 0) {
-        LOG.warn(
-            "Unable to get alert definitions for {} since there are no service components defined",
-            hostName);
-
-        return Collections.emptySet();
-      }
-
-      for (ServiceComponentHost serviceComponent : serviceComponents) {
-        String serviceName = serviceComponent.getServiceName();
-        String componentName = serviceComponent.getServiceComponentName();
-
-        // add all alerts for this service/component pair
-        definitions.addAll(m_definitionDao.findByServiceComponent(
-            clusterId, serviceName, componentName));
-      }
-
-      // for every service, get the master components and see if the host
-      // is a master
-      Set<String> services = new HashSet<String>();
-      for (Entry<String, Service> entry : cluster.getServices().entrySet()) {
-        Service service = entry.getValue();
-        Map<String, ServiceComponent> components = service.getServiceComponents();
-        for (Entry<String, ServiceComponent> component : components.entrySet()) {
-          if (component.getValue().isMasterComponent()) {
-            Map<String, ServiceComponentHost> hosts = component.getValue().getServiceComponentHosts();
-
-            if( hosts.containsKey( hostName ) ){
-              services.add(service.getName());
-            }
-          }
-        }
-      }
-
-      // add all service scoped alerts
-      if( services.size() > 0 ){
-        definitions.addAll(m_definitionDao.findByServiceMaster(clusterId,
-            services));
-      }
-
-      // add any alerts not bound to a service (host level alerts)
-      definitions.addAll(m_definitionDao.findAgentScoped(clusterId));
-    } catch (AmbariException ambariException) {
-      LOG.error("Unable to get alert definitions", ambariException);
-      return Collections.emptySet();
-    }
-
-    return definitions;
+  public Set<String> invalidateHosts(AlertDefinitionEntity definition) {
+    return invalidateHosts(definition.getClusterId(),
+        definition.getDefinitionName(), definition.getServiceName(),
+        definition.getComponentName());
   }
   }
 
 
   /**
   /**
@@ -353,8 +287,28 @@ public class AlertDefinitionHash {
    * @return the hosts that were invalidated, or an empty set (never
    * @return the hosts that were invalidated, or an empty set (never
    *         {@code null}).
    *         {@code null}).
    */
    */
-  public Set<String> invalidateHosts(AlertDefinitionEntity definition) {
-    long clusterId = definition.getClusterId();
+  public Set<String> invalidateHosts(AlertDefinition definition) {
+    return invalidateHosts(definition.getClusterId(), definition.getName(),
+        definition.getServiceName(), definition.getComponentName());
+  }
+
+  /**
+   * Invalidate the hashes of any host that would be affected by the specified
+   * definition.
+   *
+   * @param clusterId
+   *          the cluster ID
+   * @param definitionName
+   *          the definition unique name.
+   * @param definitionServiceName
+   *          the definition's service name.
+   * @param definitionComponentName
+   *          the definition's component name.
+   * @return the hosts that were invalidated, or an empty set (never
+   *         {@code null}).
+   */
+  public Set<String> invalidateHosts(long clusterId, String definitionName,
+      String definitionServiceName, String definitionComponentName) {
     Set<String> invalidatedHosts = new HashSet<String>();
     Set<String> invalidatedHosts = new HashSet<String>();
 
 
     Cluster cluster = null;
     Cluster cluster = null;
@@ -379,8 +333,6 @@ public class AlertDefinitionHash {
     }
     }
 
 
     // intercept host agent alerts; they affect all hosts
     // intercept host agent alerts; they affect all hosts
-    String definitionServiceName = definition.getServiceName();
-    String definitionComponentName = definition.getComponentName();
     if (Services.AMBARI.equals(definitionServiceName)
     if (Services.AMBARI.equals(definitionServiceName)
         && Components.AMBARI_AGENT.equals(definitionComponentName)) {
         && Components.AMBARI_AGENT.equals(definitionComponentName)) {
 
 
@@ -413,7 +365,7 @@ public class AlertDefinitionHash {
     Service service = services.get(definitionServiceName);
     Service service = services.get(definitionServiceName);
     if (null == service) {
     if (null == service) {
       LOG.warn("The alert definition {} has an unknown service of {}",
       LOG.warn("The alert definition {} has an unknown service of {}",
-          definition.getDefinitionName(), definitionServiceName);
+          definitionName, definitionServiceName);
 
 
       return invalidatedHosts;
       return invalidatedHosts;
     }
     }
@@ -438,6 +390,82 @@ public class AlertDefinitionHash {
     return invalidatedHosts;
     return invalidatedHosts;
   }
   }
 
 
+  /**
+   * Enqueue {@link AlertDefinitionCommand}s for every host specified so that
+   * they will receive a payload of alert definitions that they should be
+   * running.
+   * <p/>
+   * This method is typically called after
+   * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
+   * invalidation of the alert definition hash.
+   *
+   * @param clusterName
+   *          the name of the cluster (not {@code null}).
+   * @param hosts
+   *          the hosts to push {@link AlertDefinitionCommand}s for.
+   */
+  public void enqueueAgentCommands(long clusterId, Set<String> hosts) {
+    String clusterName = null;
+
+    try {
+      Cluster cluster = m_clusters.getClusterById(clusterId);
+      clusterName = cluster.getClusterName();
+    } catch (AmbariException ae) {
+      LOG.error("Unable to lookup cluster for alert definition commands", ae);
+    }
+
+    enqueueAgentCommands(clusterName, hosts);
+  }
+
+  /**
+   * Enqueue {@link AlertDefinitionCommand}s for every host specified so that
+   * they will receive a payload of alert definitions that they should be
+   * running.
+   * <p/>
+   * This method is typically called after
+   * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
+   * invalidation of the alert definition hash.
+   *
+   * @param clusterName
+   *          the name of the cluster (not {@code null}).
+   * @param hosts
+   *          the hosts to push {@link AlertDefinitionCommand}s for.
+   */
+  public void enqueueAgentCommands(String clusterName, Set<String> hosts) {
+    if (null == clusterName) {
+      LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
+      return;
+    }
+
+    if (null == hosts || hosts.size() == 0) {
+      return;
+    }
+
+    for (String hostName : hosts) {
+      List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
+          hostName);
+
+      String hash = getHash(clusterName, hostName);
+
+      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
+          hostName, hash, definitions);
+
+      try {
+        Cluster cluster = m_clusters.getCluster(clusterName);
+        command.addConfigs(m_configHelper.get(), cluster);
+      } catch (AmbariException ae) {
+        LOG.warn("Unable to add configurations to alert definition command", ae);
+      }
+
+      // unlike other commands, the alert definitions commands are really
+      // designed to be 1:1 per change; if multiple invalidations happened
+      // before the next heartbeat, there would be several commands that would
+      // force the agents to reschedule their alerts more than once
+      m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
+      m_actionQueue.enqueue(hostName, command);
+    }
+  }
+
   /**
   /**
    * Calculates a unique hash value representing all of the alert definitions
    * Calculates a unique hash value representing all of the alert definitions
    * that should be scheduled to run on a given host. Alerts of type
    * that should be scheduled to run on a given host. Alerts of type
@@ -492,51 +520,84 @@ public class AlertDefinitionHash {
   }
   }
 
 
   /**
   /**
-   * Enqueue {@link AlertDefinitionCommand}s for every host specified so that
-   * they will receive a payload of alert definitions that they should be
-   * running.
-   * <p/>
-   * This method is typically called after
-   * {@link #invalidateHosts(AlertDefinitionEntity)} has caused a cache
-   * invalidation of the alert definition hash.
+   * Gets the alert definition entities for the specified host. This will include the
+   * following types of alert definitions:
+   * <ul>
+   * <li>Service/Component alerts</li>
+   * <li>Service alerts where the host is a MASTER</li>
+   * <li>Host alerts that are not bound to a service</li>
+   * </ul>
    *
    *
    * @param clusterName
    * @param clusterName
-   *          the name of the cluster (not {@code null}).
-   * @param hosts
-   *          the hosts to push {@link AlertDefinitionCommand}s for.
+   *          the cluster name (not {@code null}).
+   * @param hostName
+   *          the host name (not {@code null}).
+   * @return the alert definitions for the host, or an empty set (never
+   *         {@code null}).
    */
    */
-  public void enqueueAgentCommands(String clusterName, Set<String> hosts) {
-    if (null == clusterName) {
-      LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
-      return;
-    }
+  private Set<AlertDefinitionEntity> getAlertDefinitionEntities(
+      String clusterName,
+      String hostName) {
+    Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
 
 
-    if (null == hosts || hosts.size() == 0) {
-      return;
-    }
+    try {
+      Cluster cluster = m_clusters.getCluster(clusterName);
+      if (null == cluster) {
+        LOG.warn("Unable to get alert definitions for the missing cluster {}",
+            clusterName);
 
 
-    for (String hostName : hosts) {
-      List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
-          hostName);
+        return Collections.emptySet();
+      }
 
 
-      String hash = getHash(clusterName, hostName);
+      long clusterId = cluster.getClusterId();
+      List<ServiceComponentHost> serviceComponents = cluster.getServiceComponentHosts(hostName);
+      if (null == serviceComponents || serviceComponents.size() == 0) {
+        LOG.warn(
+            "Unable to get alert definitions for {} since there are no service components defined",
+            hostName);
 
 
-      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
-          hostName, hash, definitions);
+        return Collections.emptySet();
+      }
 
 
-      try {
-        Cluster cluster = m_clusters.getCluster(clusterName);
-        command.addConfigs(m_configHelper.get(), cluster);
-      } catch (AmbariException ae) {
-        LOG.warn("Unable to add configurations to alert definition command", ae);
+      for (ServiceComponentHost serviceComponent : serviceComponents) {
+        String serviceName = serviceComponent.getServiceName();
+        String componentName = serviceComponent.getServiceComponentName();
+
+        // add all alerts for this service/component pair
+        definitions.addAll(m_definitionDao.findByServiceComponent(
+            clusterId, serviceName, componentName));
       }
       }
 
 
-      // unlike other commands, the alert definitions commands are really
-      // designed to be 1:1 per change; if multiple invalidations happened
-      // before the next heartbeat, there would be several commands that would
-      // force the agents to reschedule their alerts more than once
-      m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
-      m_actionQueue.enqueue(hostName, command);
+      // for every service, get the master components and see if the host
+      // is a master
+      Set<String> services = new HashSet<String>();
+      for (Entry<String, Service> entry : cluster.getServices().entrySet()) {
+        Service service = entry.getValue();
+        Map<String, ServiceComponent> components = service.getServiceComponents();
+        for (Entry<String, ServiceComponent> component : components.entrySet()) {
+          if (component.getValue().isMasterComponent()) {
+            Map<String, ServiceComponentHost> hosts = component.getValue().getServiceComponentHosts();
+
+            if( hosts.containsKey( hostName ) ){
+              services.add(service.getName());
+            }
+          }
+        }
+      }
+
+      // add all service scoped alerts
+      if( services.size() > 0 ){
+        definitions.addAll(m_definitionDao.findByServiceMaster(clusterId,
+            services));
+      }
+
+      // add any alerts not bound to a service (host level alerts)
+      definitions.addAll(m_definitionDao.findAgentScoped(clusterId));
+    } catch (AmbariException ambariException) {
+      LOG.error("Unable to get alert definitions", ambariException);
+      return Collections.emptySet();
     }
     }
+
+    return definitions;
   }
   }
 }
 }

+ 87 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java

@@ -382,6 +382,93 @@ public class AlertGroupResourceProviderTest {
     verify(m_amc, m_clusters, m_cluster, m_dao, m_definitionDao);
     verify(m_amc, m_clusters, m_cluster, m_dao, m_definitionDao);
   }
   }
 
 
+  /**
+   * Tests that updating a default group doesn't change read-only properties
+   *
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testUpdateDefaultGroup() throws Exception {
+    Capture<AlertGroupEntity> entityCapture = new Capture<AlertGroupEntity>();
+
+    // the definition IDs to associate with the group
+    List<Long> definitionIds = new ArrayList<Long>();
+    definitionIds.add(ALERT_DEF_ID);
+
+    // the target IDs to associate with the group
+    List<Long> targetIds = new ArrayList<Long>();
+    targetIds.add(ALERT_TARGET_ID);
+
+    // definition entities to return from DAO
+    List<AlertDefinitionEntity> definitionEntities = new ArrayList<AlertDefinitionEntity>();
+    definitionEntities.addAll(getMockDefinitions());
+
+    // target entities to return from DAO
+    List<AlertTargetEntity> newTargetEntities = new ArrayList<AlertTargetEntity>();
+    newTargetEntities.addAll(getMockTargets());
+
+    Set<AlertTargetEntity> mockTargets2 = getMockTargets();
+    AlertTargetEntity target2 = mockTargets2.iterator().next();
+    target2.setTargetId(29L);
+
+    newTargetEntities.add(target2);
+
+    AlertGroupEntity group = new AlertGroupEntity();
+    group.setDefault(true);
+    group.setGroupName(ALERT_GROUP_NAME);
+    group.setAlertDefinitions(getMockDefinitions());
+    group.setAlertTargets(getMockTargets());
+
+    expect(m_dao.findGroupById(ALERT_GROUP_ID)).andReturn(group).times(1);
+    expect(m_dao.merge(capture(entityCapture))).andReturn(group).once();
+
+    // expect target entity lookup for association
+    List<Long> newTargets = Arrays.asList(28L, 29L);
+    expect(m_dao.findTargetsById(EasyMock.eq(newTargets))).andReturn(
+        newTargetEntities).once();
+
+    replay(m_dao, m_definitionDao);
+
+    AlertGroupResourceProvider provider = createProvider(m_amc);
+
+    // create new properties, and include the ID since we're not going through
+    // a service layer which would add it for us automatically
+    Map<String, Object> requestProps = new HashMap<String, Object>();
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_ID,
+        ALERT_GROUP_ID.toString());
+
+    // try to change the name (it should not work)
+    String newName = ALERT_GROUP_NAME + " Foo";
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_NAME, newName);
+
+    // try to change the definitions (it should not work)
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_DEFINITIONS,
+        new ArrayList<Long>());
+
+    // try to change the targets (it should work)
+    requestProps.put(AlertGroupResourceProvider.ALERT_GROUP_TARGETS,
+        newTargets);
+
+    Predicate predicate = new PredicateBuilder().property(
+        AlertGroupResourceProvider.ALERT_GROUP_CLUSTER_NAME).equals(
+        ALERT_GROUP_CLUSTER_NAME).and().property(
+        AlertGroupResourceProvider.ALERT_GROUP_ID).equals(
+        ALERT_GROUP_ID.toString()).toPredicate();
+
+    Request request = PropertyHelper.getUpdateRequest(requestProps, null);
+    provider.updateResources(request, predicate);
+
+    assertTrue(entityCapture.hasCaptured());
+
+    AlertGroupEntity entity = entityCapture.getValue();
+    assertEquals(ALERT_GROUP_NAME, entity.getGroupName());
+    assertEquals(2, entity.getAlertTargets().size());
+    assertEquals(1, entity.getAlertDefinitions().size());
+
+    verify(m_dao, m_definitionDao);
+  }
+
   /**
   /**
    * @throws Exception
    * @throws Exception
    */
    */

+ 3 - 3
ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java

@@ -118,7 +118,7 @@ public class AlertDefinitionDAOTest {
       definition.setHash(UUID.randomUUID().toString());
       definition.setHash(UUID.randomUUID().toString());
       definition.setScheduleInterval(60);
       definition.setScheduleInterval(60);
       definition.setScope(Scope.SERVICE);
       definition.setScope(Scope.SERVICE);
-      definition.setSource("Source " + i);
+      definition.setSource("{\"type\" : \"SCRIPT\"}");
       definition.setSourceType(SourceType.SCRIPT);
       definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
       dao.create(definition);
     }
     }
@@ -133,7 +133,7 @@ public class AlertDefinitionDAOTest {
       definition.setHash(UUID.randomUUID().toString());
       definition.setHash(UUID.randomUUID().toString());
       definition.setScheduleInterval(60);
       definition.setScheduleInterval(60);
       definition.setScope(Scope.HOST);
       definition.setScope(Scope.HOST);
-      definition.setSource("Source " + i);
+      definition.setSource("{\"type\" : \"SCRIPT\"}");
       definition.setSourceType(SourceType.SCRIPT);
       definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
       dao.create(definition);
     }
     }
@@ -148,7 +148,7 @@ public class AlertDefinitionDAOTest {
       definition.setHash(UUID.randomUUID().toString());
       definition.setHash(UUID.randomUUID().toString());
       definition.setScheduleInterval(60);
       definition.setScheduleInterval(60);
       definition.setScope(Scope.HOST);
       definition.setScope(Scope.HOST);
-      definition.setSource("Source " + i);
+      definition.setSource("{\"type\" : \"SCRIPT\"}");
       definition.setSourceType(SourceType.SCRIPT);
       definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
       dao.create(definition);
     }
     }

+ 1 - 0
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionEqualityTest.java

@@ -72,6 +72,7 @@ public class AlertDefinitionEqualityTest extends TestCase {
    */
    */
   private AlertDefinition getAlertDefinition(SourceType sourceType) {
   private AlertDefinition getAlertDefinition(SourceType sourceType) {
     AlertDefinition definition = new AlertDefinition();
     AlertDefinition definition = new AlertDefinition();
+    definition.setClusterId(1);
     definition.setComponentName("component");
     definition.setComponentName("component");
     definition.setEnabled(true);
     definition.setEnabled(true);
     definition.setInterval(1);
     definition.setInterval(1);

+ 66 - 2
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java

@@ -17,13 +17,20 @@
  */
  */
 package org.apache.ambari.server.state.alerts;
 package org.apache.ambari.server.state.alerts;
 
 
+import java.lang.reflect.Field;
+
 import junit.framework.Assert;
 import junit.framework.Assert;
 
 
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AmbariEvent;
 import org.apache.ambari.server.events.AmbariEvent;
+import org.apache.ambari.server.events.listeners.AlertLifecycleListener;
 import org.apache.ambari.server.events.listeners.AlertServiceStateListener;
 import org.apache.ambari.server.events.listeners.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.OrmTestHelper;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
 import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
@@ -33,10 +40,15 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
+import org.apache.ambari.server.state.alert.AggregateSource;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.Scope;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Guice;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
 import com.google.inject.persist.PersistService;
@@ -55,6 +67,9 @@ public class AlertEventPublisherTest {
   private Injector injector;
   private Injector injector;
   private ServiceFactory serviceFactory;
   private ServiceFactory serviceFactory;
   private AmbariMetaInfo metaInfo;
   private AmbariMetaInfo metaInfo;
+  private OrmTestHelper ormHelper;
+  private AggregateDefinitionMapping aggregateMapping;
+  private AmbariEventPublisher eventPublisher;
 
 
   /**
   /**
    *
    *
@@ -64,13 +79,25 @@ public class AlertEventPublisherTest {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
     injector.getInstance(GuiceJpaInitializer.class);
 
 
+    eventPublisher = injector.getInstance(AmbariEventPublisher.class);
+    EventBus synchronizedBus = new EventBus();
+
     // force singleton init via Guice so the listener registers with the bus
     // force singleton init via Guice so the listener registers with the bus
-    injector.getInstance(AlertServiceStateListener.class);
+    synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
+    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+    synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
+
+    // !!! need a synchronous op for testing
+    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
+    field.setAccessible(true);
+    field.set(eventPublisher, synchronizedBus);
 
 
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     definitionDao = injector.getInstance(AlertDefinitionDAO.class);
     definitionDao = injector.getInstance(AlertDefinitionDAO.class);
     clusters = injector.getInstance(Clusters.class);
     clusters = injector.getInstance(Clusters.class);
     serviceFactory = injector.getInstance(ServiceFactory.class);
     serviceFactory = injector.getInstance(ServiceFactory.class);
+    ormHelper = injector.getInstance(OrmTestHelper.class);
+    aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class);
 
 
     metaInfo = injector.getInstance(AmbariMetaInfo.class);
     metaInfo = injector.getInstance(AmbariMetaInfo.class);
     metaInfo.init();
     metaInfo.init();
@@ -107,7 +134,7 @@ public class AlertEventPublisherTest {
   /**
   /**
    * Tests that all {@link AlertDefinitionEntity} instances are created for the
    * Tests that all {@link AlertDefinitionEntity} instances are created for the
    * installed service.
    * installed service.
-   * 
+   *
    * @throws Exception
    * @throws Exception
    */
    */
   @Test
   @Test
@@ -117,6 +144,43 @@ public class AlertEventPublisherTest {
     Assert.assertEquals(4, definitionDao.findAll().size());
     Assert.assertEquals(4, definitionDao.findAll().size());
   }
   }
 
 
+  /**
+   * Tests that {@link AlertDefinitionDeleteEvent} instances are fired when a
+   * definition is removed.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAlertDefinitionRemoval() throws Exception {
+    Assert.assertEquals(0, definitionDao.findAll().size());
+    AlertDefinitionEntity definition = ormHelper.createAlertDefinition(1L);
+    Assert.assertEquals(1, definitionDao.findAll().size());
+
+    AggregateSource source = new AggregateSource();
+    source.setAlertName(definition.getDefinitionName());
+
+    AlertDefinition aggregate = new AlertDefinition();
+    aggregate.setClusterId(1L);
+    aggregate.setComponentName("DATANODE");
+    aggregate.setEnabled(true);
+    aggregate.setInterval(1);
+    aggregate.setLabel("DataNode Aggregate");
+    aggregate.setName("datanode_aggregate");
+    aggregate.setScope(Scope.ANY);
+    aggregate.setServiceName("HDFS");
+    aggregate.setSource(source);
+    aggregate.setUuid("uuid");
+
+    aggregateMapping.registerAggregate(1L, aggregate);
+    Assert.assertNotNull(aggregateMapping.getAggregateDefinition(1L,
+        source.getAlertName()));
+
+    definitionDao.remove(definition);
+
+    Assert.assertNull(aggregateMapping.getAggregateDefinition(1L,
+        source.getAlertName()));
+  }
+
   /**
   /**
    * Calls {@link Service#persist()} to mock a service install.
    * Calls {@link Service#persist()} to mock a service install.
    */
    */

+ 10 - 2
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java

@@ -17,6 +17,7 @@
  */
  */
 package org.apache.ambari.server.state.alerts;
 package org.apache.ambari.server.state.alerts;
 
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.List;
@@ -39,6 +40,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Binder;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Injector;
@@ -72,6 +74,14 @@ public class AlertStateChangedEventTest {
 
 
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     dispatchDao = injector.getInstance(AlertDispatchDAO.class);
     eventPublisher = injector.getInstance(AlertEventPublisher.class);
     eventPublisher = injector.getInstance(AlertEventPublisher.class);
+
+    EventBus synchronizedBus = new EventBus();
+    synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+
+    // !!! need a synchronous op for testing
+    Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
+    field.setAccessible(true);
+    field.set(eventPublisher, synchronizedBus);
   }
   }
 
 
   /**
   /**
@@ -99,8 +109,6 @@ public class AlertStateChangedEventTest {
 
 
     // async publishing
     // async publishing
     eventPublisher.publish(event);
     eventPublisher.publish(event);
-    Thread.sleep(2000);
-
     EasyMock.verify(dispatchDao, history, event);
     EasyMock.verify(dispatchDao, history, event);
   }
   }
 
 

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java

@@ -373,7 +373,7 @@ public class AlertDataManagerTest {
     AggregateDefinitionMapping aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class);
     AggregateDefinitionMapping aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class);
 
 
     AlertDefinition aggregateDefinition = factory.coerce(aggDef);
     AlertDefinition aggregateDefinition = factory.coerce(aggDef);
-    aggregateMapping.addAggregateType(clusterId.longValue(),
+    aggregateMapping.registerAggregate(clusterId.longValue(),
         aggregateDefinition );
         aggregateDefinition );
 
 
     AggregateSource as = (AggregateSource) aggregateDefinition.getSource();
     AggregateSource as = (AggregateSource) aggregateDefinition.getSource();