Bläddra i källkod

AMBARI-7568 - Alerts: An Alert Definition On Demand Execution (jonathanhurley)

Jonathan Hurley 10 år sedan
förälder
incheckning
054745d064

+ 49 - 13
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -33,9 +33,9 @@ from alerts.metric_alert import MetricAlert
 from alerts.port_alert import PortAlert
 from alerts.port_alert import PortAlert
 from alerts.script_alert import ScriptAlert
 from alerts.script_alert import ScriptAlert
 
 
-
 logger = logging.getLogger()
 logger = logging.getLogger()
 
 
+
 class AlertSchedulerHandler():
 class AlertSchedulerHandler():
   make_cachedir = True
   make_cachedir = True
 
 
@@ -50,6 +50,7 @@ class AlertSchedulerHandler():
     'standalone': False
     'standalone': False
   }
   }
 
 
+
   def __init__(self, cachedir, stacks_dir, in_minutes=True):
   def __init__(self, cachedir, stacks_dir, in_minutes=True):
     self.cachedir = cachedir
     self.cachedir = cachedir
     self.stacks_dir = stacks_dir
     self.stacks_dir = stacks_dir
@@ -66,7 +67,7 @@ class AlertSchedulerHandler():
     self.__in_minutes = in_minutes
     self.__in_minutes = in_minutes
     self.__config_maps = {}
     self.__config_maps = {}
 
 
-          
+
   def update_definitions(self, alert_commands, reschedule_jobs=False):
   def update_definitions(self, alert_commands, reschedule_jobs=False):
     ''' updates the persisted definitions and restarts the scheduler '''
     ''' updates the persisted definitions and restarts the scheduler '''
     
     
@@ -76,11 +77,11 @@ class AlertSchedulerHandler():
     if reschedule_jobs:
     if reschedule_jobs:
       self.reschedule()
       self.reschedule()
 
 
-      
+
   def __make_function(self, alert_def):
   def __make_function(self, alert_def):
     return lambda: alert_def.collect()
     return lambda: alert_def.collect()
 
 
-    
+
   def start(self):
   def start(self):
     ''' loads definitions from file and starts the scheduler '''
     ''' loads definitions from file and starts the scheduler '''
 
 
@@ -102,7 +103,7 @@ class AlertSchedulerHandler():
 
 
     self.__scheduler.start()
     self.__scheduler.start()
 
 
-    
+
   def stop(self):
   def stop(self):
     if not self.__scheduler is None:
     if not self.__scheduler is None:
       self.__scheduler.shutdown(wait=False)
       self.__scheduler.shutdown(wait=False)
@@ -159,7 +160,7 @@ class AlertSchedulerHandler():
     ''' gets the collector for reporting to the server '''
     ''' gets the collector for reporting to the server '''
     return self._collector
     return self._collector
   
   
-      
+
   def __load_definitions(self):
   def __load_definitions(self):
     ''' loads all alert commands from the file.  all clusters are stored in one file '''
     ''' loads all alert commands from the file.  all clusters are stored in one file '''
     definitions = []
     definitions = []
@@ -184,13 +185,11 @@ class AlertSchedulerHandler():
         configmap = command_json['configurations']
         configmap = command_json['configurations']
 
 
       for definition in command_json['alertDefinitions']:
       for definition in command_json['alertDefinitions']:
-        obj = self.__json_to_callable(definition)
+        obj = self.__json_to_callable(clusterName, hostName, definition)
         
         
         if obj is None:
         if obj is None:
           continue
           continue
           
           
-        obj.set_cluster(clusterName, hostName)
-
         # get the config values for the alerts 'lookup keys',
         # get the config values for the alerts 'lookup keys',
         # eg: hdfs-site/dfs.namenode.http-address : host_and_port        
         # eg: hdfs-site/dfs.namenode.http-address : host_and_port        
         vals = self.__find_config_values(configmap, obj.get_lookup_keys())
         vals = self.__find_config_values(configmap, obj.get_lookup_keys())
@@ -202,7 +201,8 @@ class AlertSchedulerHandler():
       
       
     return definitions
     return definitions
 
 
-  def __json_to_callable(self, json_definition):
+
+  def __json_to_callable(self, clusterName, hostName, json_definition):
     '''
     '''
     converts the json that represents all aspects of a definition
     converts the json that represents all aspects of a definition
     and makes an object that extends BaseAlert that is used for individual
     and makes an object that extends BaseAlert that is used for individual
@@ -223,8 +223,12 @@ class AlertSchedulerHandler():
       source['stacks_dir'] = self.stacks_dir
       source['stacks_dir'] = self.stacks_dir
       alert = ScriptAlert(json_definition, source)
       alert = ScriptAlert(json_definition, source)
 
 
+    if alert is not None:
+      alert.set_cluster(clusterName, hostName)
+
     return alert
     return alert
-    
+
+
   def __find_config_values(self, configmap, obj_keylist):
   def __find_config_values(self, configmap, obj_keylist):
     ''' finds templated values in the configuration map provided  by the server '''
     ''' finds templated values in the configuration map provided  by the server '''
     if configmap is None:
     if configmap is None:
@@ -242,7 +246,8 @@ class AlertSchedulerHandler():
         pass
         pass
         
         
     return result
     return result
-    
+
+ 
   def update_configurations(self, commands):
   def update_configurations(self, commands):
     '''
     '''
     when an execution command comes in, update any necessary values.
     when an execution command comes in, update any necessary values.
@@ -291,6 +296,7 @@ class AlertSchedulerHandler():
     logger.info("Scheduling {0} with UUID {1}".format(
     logger.info("Scheduling {0} with UUID {1}".format(
       definition.get_name(), definition.get_uuid()))
       definition.get_name(), definition.get_uuid()))
   
   
+
   def get_job_count(self):
   def get_job_count(self):
     '''
     '''
     Gets the number of jobs currently scheduled. This is mainly used for
     Gets the number of jobs currently scheduled. This is mainly used for
@@ -299,7 +305,37 @@ class AlertSchedulerHandler():
     if self.__scheduler is None:
     if self.__scheduler is None:
       return 0
       return 0
     
     
-    return len(self.__scheduler.get_jobs())   
+    return len(self.__scheduler.get_jobs())
+
+  
+  def execute_alert(self, execution_commands):
+    '''
+    Executes an alert immediately, ignoring any scheduled jobs. The existing
+    jobs remain untouched. The result of this is stored in the alert
+    collector for tranmission during the next heartbeat
+    '''
+    if self.__scheduler is None or execution_commands is None:
+      return
+
+    for execution_command in execution_commands:
+      try:
+        alert_definition = execution_command['alertDefinition']
+        
+        clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName']
+        hostName = '' if not 'hostName' in execution_command else execution_command['hostName']      
+        
+        alert = self.__json_to_callable(clusterName, hostName, alert_definition)
+  
+        if alert is None:
+          continue
+  
+        logger.info("Executing on-demand alert {0} ({1})".format(alert.get_name(), 
+            alert.get_uuid()))
+        
+        alert.set_helpers(self._collector, self.__config_maps[clusterName])
+        alert.collect()
+      except:
+        logger.exception("Unable to execute the alert outside of the job scheduler")
 
 
 def main():
 def main():
   args = list(sys.argv)
   args = list(sys.argv)

+ 4 - 0
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -262,6 +262,10 @@ class Controller(threading.Thread):
         if 'alertDefinitionCommands' in response.keys():
         if 'alertDefinitionCommands' in response.keys():
           self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
           self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
           pass
           pass
+        
+        if 'alertExecutionCommands' in response.keys():
+          self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
+          pass        
 
 
         if "true" == response['restartAgent']:
         if "true" == response['restartAgent']:
           logger.error("Received the restartAgent command")
           logger.error("Received the restartAgent command")

+ 8 - 4
ambari-agent/src/main/python/ambari_agent/alerts/collector.py

@@ -28,13 +28,15 @@ class AlertCollector():
   '''  
   '''  
   def __init__(self):
   def __init__(self):
     self.__buckets = {}
     self.__buckets = {}
-    
+
+
   def put(self, cluster, alert):
   def put(self, cluster, alert):
     if not cluster in self.__buckets:
     if not cluster in self.__buckets:
       self.__buckets[cluster] = {}
       self.__buckets[cluster] = {}
       
       
     self.__buckets[cluster][alert['name']] = alert
     self.__buckets[cluster][alert['name']] = alert
-    
+
+
   def remove(self, cluster, alert_name):
   def remove(self, cluster, alert_name):
     '''
     '''
     Removes the alert with the specified name if it exists in the dictionary
     Removes the alert with the specified name if it exists in the dictionary
@@ -43,7 +45,8 @@ class AlertCollector():
       return
       return
     
     
     del self.__buckets[cluster][alert_name]
     del self.__buckets[cluster][alert_name]
-    
+
+
   def remove_by_uuid(self, alert_uuid):
   def remove_by_uuid(self, alert_uuid):
     '''
     '''
     Removes the alert with the specified uuid if it exists in the dictionary
     Removes the alert with the specified uuid if it exists in the dictionary
@@ -53,7 +56,8 @@ class AlertCollector():
         alert = alert_map[alert_name]
         alert = alert_map[alert_name]
         if alert['uuid'] == alert_uuid:
         if alert['uuid'] == alert_uuid:
           self.remove(cluster, alert_name)
           self.remove(cluster, alert_name)
-        
+
+
   def alerts(self):
   def alerts(self):
     alerts = []
     alerts = []
     for clustermap in self.__buckets.values()[:]:
     for clustermap in self.__buckets.values()[:]:

+ 43 - 1
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -312,4 +312,46 @@ class TestAlerts(TestCase):
     ash.schedule_definition(pa)
     ash.schedule_definition(pa)
     
     
     # verify enabled alert was scheduled
     # verify enabled alert was scheduled
-    self.assertEquals(3, ash.get_job_count())    
+    self.assertEquals(3, ash.get_job_count())
+    
+  def test_immediate_alert(self):
+    test_file_path = os.path.join('ambari_agent', 'dummy_files')
+    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path)
+    ash.start()
+
+    self.assertEquals(1, ash.get_job_count())
+    self.assertEquals(0, len(ash._collector.alerts()))
+
+    execution_commands = [ { 
+        "clusterName": "c1",
+        "hostName": "c6401.ambari.apache.org",    
+        "alertDefinition": {         
+          "name": "namenode_process",
+          "service": "HDFS",
+          "component": "NAMENODE",
+          "label": "NameNode process",
+          "interval": 6,
+          "scope": "host",
+          "enabled": True,
+          "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
+          "source": {
+            "type": "PORT",
+            "uri": "{{hdfs-site/my-key}}",
+            "default_port": 50070,
+            "reporting": {
+              "ok": {
+                "text": "TCP OK - {0:.4f} response time on port {1}"
+              },
+              "critical": {
+                "text": "Could not load process info: {0}"
+              }
+            }
+          }
+        }
+      } ]
+    
+    # execute the alert immediately and verify that the collector has the result
+    ash.execute_alert(execution_commands)
+    self.assertEquals(1, len(ash._collector.alerts()))

+ 11 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java

@@ -53,7 +53,17 @@ public abstract class AgentCommand {
     STATUS_COMMAND,
     STATUS_COMMAND,
     CANCEL_COMMAND,
     CANCEL_COMMAND,
     REGISTRATION_COMMAND,
     REGISTRATION_COMMAND,
-    ALERT_DEFINITION_COMMAND
+
+    /**
+     * Sends alert definitions to an agent which will refresh all alerts running
+     * on that host.
+     */
+    ALERT_DEFINITION_COMMAND,
+
+    /**
+     * A single alert that should be run immediately.
+     */
+    ALERT_EXECUTION_COMMAND
   }
   }
 
 
   public AgentCommandType getCommandType() {
   public AgentCommandType getCommandType() {

+ 74 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/AlertExecutionCommand.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.apache.ambari.server.state.alert.AlertDefinition;
+
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * The {@link AlertExecutionCommand} is used to instruct an agent to run an
+ * alert immediately and return the result in the next heartbeat. This does not
+ * affect any existing jobs and does cause the job for the executed alert to
+ * reset its interval.
+ */
+public class AlertExecutionCommand extends AgentCommand {
+
+  /**
+   * The name of the cluster.
+   */
+  @SerializedName("clusterName")
+  private final String m_clusterName;
+
+  /**
+   * The agent hostname.
+   */
+  @SerializedName("hostName")
+  private final String m_hostName;
+
+  /**
+   * The definition to run.
+   */
+  @SerializedName("alertDefinition")
+  private final AlertDefinition m_definition;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterName
+   *          the name of the cluster (not {@code null}).
+   * @param hostName
+   *          the name of the host (not {@code null}).
+   * @param definition
+   *          the definition to run (not {@code null}).
+   */
+  public AlertExecutionCommand(String clusterName, String hostName, AlertDefinition definition) {
+    super(AgentCommandType.ALERT_EXECUTION_COMMAND);
+    m_clusterName = clusterName;
+    m_hostName = hostName;
+    m_definition = definition;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public AgentCommandType getCommandType() {
+    return AgentCommandType.ALERT_EXECUTION_COMMAND;
+  }
+}

+ 4 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -621,6 +621,10 @@ public class HeartBeatHandler {
             response.addAlertDefinitionCommand((AlertDefinitionCommand) ac);
             response.addAlertDefinitionCommand((AlertDefinitionCommand) ac);
             break;
             break;
           }
           }
+          case ALERT_EXECUTION_COMMAND: {
+            response.addAlertExecutionCommand((AlertExecutionCommand) ac);
+            break;
+          }
           default:
           default:
             LOG.error("There is no action for agent command ="
             LOG.error("There is no action for agent command ="
                 + ac.getCommandType().name());
                 + ac.getCommandType().name());

+ 17 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java

@@ -50,6 +50,13 @@ public class HeartBeatResponse {
   @SerializedName("alertDefinitionCommands")
   @SerializedName("alertDefinitionCommands")
   private List<AlertDefinitionCommand> alertDefinitionCommands = null;
   private List<AlertDefinitionCommand> alertDefinitionCommands = null;
 
 
+  /**
+   * {@link AlertExecutionCommand}s are used to execute an alert job
+   * immediately.
+   */
+  @SerializedName("alertExecutionCommands")
+  private List<AlertExecutionCommand> alertExecutionCommands = null;
+
   @SerializedName("registrationCommand")
   @SerializedName("registrationCommand")
   private RegistrationCommand registrationCommand;
   private RegistrationCommand registrationCommand;
 
 
@@ -159,6 +166,16 @@ public class HeartBeatResponse {
     alertDefinitionCommands.add(command);
     alertDefinitionCommands.add(command);
   }
   }
 
 
+  public void addAlertExecutionCommand(AlertExecutionCommand command) {
+    // commands are added here when they are taken off the queue; there should
+    // be no thread contention and thus no worry about locks for the null check
+    if (null == alertExecutionCommands) {
+      alertExecutionCommands = new ArrayList<AlertExecutionCommand>();
+    }
+
+    alertExecutionCommands.add(command);
+  }
+
   @Override
   @Override
   public String toString() {
   public String toString() {
     StringBuilder buffer = new StringBuilder("HeartBeatResponse{");
     StringBuilder buffer = new StringBuilder("HeartBeatResponse{");

+ 82 - 5
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java

@@ -28,6 +28,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.UUID;
 
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AlertExecutionCommand;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
 import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
 import org.apache.ambari.server.controller.spi.NoSuchResourceException;
 import org.apache.ambari.server.controller.spi.NoSuchResourceException;
@@ -43,6 +45,8 @@ import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.AlertDefinitionFactory;
 import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
 import org.apache.ambari.server.state.alert.SourceType;
@@ -77,6 +81,7 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
   protected static final String ALERT_DEF_SOURCE_REPORTING_WARNING = "AlertDefinition/source/reporting/warning";
   protected static final String ALERT_DEF_SOURCE_REPORTING_WARNING = "AlertDefinition/source/reporting/warning";
   protected static final String ALERT_DEF_SOURCE_REPORTING_CRITICAL = "AlertDefinition/source/reporting/critical";
   protected static final String ALERT_DEF_SOURCE_REPORTING_CRITICAL = "AlertDefinition/source/reporting/critical";
 
 
+  protected static final String ALERT_DEF_ACTION_RUN_NOW = "AlertDefinition/run_now";
 
 
   private static Set<String> pkPropertyIds = new HashSet<String>(
   private static Set<String> pkPropertyIds = new HashSet<String>(
       Arrays.asList(ALERT_DEF_ID, ALERT_DEF_NAME));
       Arrays.asList(ALERT_DEF_ID, ALERT_DEF_NAME));
@@ -87,6 +92,18 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
 
 
   private static AlertDefinitionHash alertDefinitionHash;
   private static AlertDefinitionHash alertDefinitionHash;
 
 
+  /**
+   * Used for coercing an {@link AlertDefinitionEntity} to an
+   * {@link AlertDefinition}.
+   */
+  private static AlertDefinitionFactory definitionFactory;
+
+  /**
+   * Used to enqueue commands to send to the agents when running an alert
+   * on-demand.
+   */
+  private static ActionQueue actionQueue;
+
   /**
   /**
    * @param instance
    * @param instance
    */
    */
@@ -94,6 +111,8 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
   public static void init(Injector injector) {
   public static void init(Injector injector) {
     alertDefinitionDAO = injector.getInstance(AlertDefinitionDAO.class);
     alertDefinitionDAO = injector.getInstance(AlertDefinitionDAO.class);
     alertDefinitionHash = injector.getInstance(AlertDefinitionHash.class);
     alertDefinitionHash = injector.getInstance(AlertDefinitionHash.class);
+    definitionFactory = injector.getInstance(AlertDefinitionFactory.class);
+    actionQueue = injector.getInstance(ActionQueue.class);
   }
   }
 
 
   AlertDefinitionResourceProvider(Set<String> propertyIds,
   AlertDefinitionResourceProvider(Set<String> propertyIds,
@@ -199,9 +218,24 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
       NoSuchResourceException, NoSuchParentResourceException {
       NoSuchResourceException, NoSuchParentResourceException {
 
 
     String clusterName = null;
     String clusterName = null;
-    Set<String> invalidatedHosts = new HashSet<String>();
     Clusters clusters = getManagementController().getClusters();
     Clusters clusters = getManagementController().getClusters();
 
 
+    // check the predicate to see if there is a reques to run
+    // the alert definition immediately
+    if( null != predicate ){
+      Set<Map<String,Object>> predicateMaps = getPropertyMaps(predicate);
+      for (Map<String, Object> propertyMap : predicateMaps) {
+        String runNow = (String) propertyMap.get(ALERT_DEF_ACTION_RUN_NOW);
+        if (null != runNow) {
+          if (Boolean.valueOf(runNow) == Boolean.TRUE) {
+            scheduleImmediateAlert(propertyMap);
+          }
+        }
+      }
+    }
+
+    // if an AlertDefinition property body was specified, perform the update\
+    Set<String> invalidatedHosts = new HashSet<String>();
     for (Map<String, Object> requestPropMap : request.getProperties()) {
     for (Map<String, Object> requestPropMap : request.getProperties()) {
       for (Map<String, Object> propertyMap : getPropertyMaps(requestPropMap, predicate)) {
       for (Map<String, Object> propertyMap : getPropertyMaps(requestPropMap, predicate)) {
         String stringId = (String) propertyMap.get(ALERT_DEF_ID);
         String stringId = (String) propertyMap.get(ALERT_DEF_ID);
@@ -234,10 +268,13 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
       }
       }
     }
     }
 
 
-    // build alert definition commands for all agent hosts affected
-    alertDefinitionHash.enqueueAgentCommands(clusterName, invalidatedHosts);
-
-    notifyUpdate(Resource.Type.AlertDefinition, request, predicate);
+    // build alert definition commands for all agent hosts affected; only
+    // update agents and broadcast update event if there are actual invalidated
+    // hosts
+    if (invalidatedHosts.size() > 0) {
+      alertDefinitionHash.enqueueAgentCommands(clusterName, invalidatedHosts);
+      notifyUpdate(Resource.Type.AlertDefinition, request, predicate);
+    }
 
 
     return getRequestStatus(null);
     return getRequestStatus(null);
   }
   }
@@ -501,4 +538,44 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
 
 
     return resource;
     return resource;
   }
   }
+
+  /**
+   * Extracts an {@link AlertDefinitionEntity} from the property map and
+   * enqueues an {@link AlertExecutionCommand} for every host that should re-run
+   * the specified definition.
+   *
+   * @param propertyMap
+   */
+  private void scheduleImmediateAlert(Map<String, Object> propertyMap) {
+    Clusters clusters = getManagementController().getClusters();
+    String stringId = (String) propertyMap.get(ALERT_DEF_ID);
+    long id = Long.parseLong(stringId);
+
+    AlertDefinitionEntity entity = alertDefinitionDAO.findById(id);
+    if (null == entity) {
+      LOG.error("Unable to lookup alert definition with ID {}", id);
+      return;
+    }
+
+    Cluster cluster = null;
+    try {
+      cluster = clusters.getClusterById(entity.getClusterId());
+    } catch (AmbariException ambariException) {
+      LOG.error("Unable to lookup cluster with ID {}", entity.getClusterId(),
+          ambariException);
+      return;
+    }
+
+    Set<String> hostNames = alertDefinitionHash.getAssociatedHosts(cluster,
+        entity.getDefinitionName(), entity.getServiceName(),
+        entity.getComponentName());
+
+    for (String hostName : hostNames) {
+      AlertDefinition definition = definitionFactory.coerce(entity);
+      AlertExecutionCommand command = new AlertExecutionCommand(
+          cluster.getClusterName(), hostName, definition);
+
+      actionQueue.enqueue(hostName, command);
+    }
+  }
 }
 }

+ 49 - 14
ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java

@@ -309,16 +309,13 @@ public class AlertDefinitionHash {
    */
    */
   public Set<String> invalidateHosts(long clusterId, String definitionName,
   public Set<String> invalidateHosts(long clusterId, String definitionName,
       String definitionServiceName, String definitionComponentName) {
       String definitionServiceName, String definitionComponentName) {
-    Set<String> invalidatedHosts = new HashSet<String>();
 
 
     Cluster cluster = null;
     Cluster cluster = null;
-    Map<String, Host> hosts = null;
     String clusterName = null;
     String clusterName = null;
     try {
     try {
       cluster = m_clusters.getClusterById(clusterId);
       cluster = m_clusters.getClusterById(clusterId);
       if (null != cluster) {
       if (null != cluster) {
         clusterName = cluster.getClusterName();
         clusterName = cluster.getClusterName();
-        hosts = m_clusters.getHostsForCluster(clusterName);
       }
       }
 
 
       if (null == cluster) {
       if (null == cluster) {
@@ -329,16 +326,55 @@ public class AlertDefinitionHash {
     }
     }
 
 
     if (null == cluster) {
     if (null == cluster) {
-      return invalidatedHosts;
+      return Collections.emptySet();
+    }
+
+    // determine which hosts in the cluster would be affected by a change
+    // to the specified definition
+    Set<String> affectedHosts = getAssociatedHosts(cluster, definitionName,
+        definitionServiceName, definitionComponentName);
+
+    // invalidate all returned hosts
+    for (String hostName : affectedHosts) {
+      invalidate(clusterName, hostName);
+    }
+
+    return affectedHosts;
+  }
+
+  /**
+   * Gets the hosts that are associated with the specified definition. Each host
+   * returned is expected to be capable of running the alert. A change to the
+   * definition would entail contacting each returned host and invalidating
+   * their current alert definitions.
+   *
+   * @param cluster
+   * @param definitionName
+   * @param definitionServiceName
+   * @param definitionComponentName
+   * @return a set of all associated hosts or an empty set, never {@code null}.
+   */
+  public Set<String> getAssociatedHosts(Cluster cluster, String definitionName,
+      String definitionServiceName, String definitionComponentName) {
+
+    Map<String, Host> hosts = null;
+    String clusterName = cluster.getClusterName();
+    Set<String> affectedHosts = new HashSet<String>();
+
+    try {
+      hosts = m_clusters.getHostsForCluster(clusterName);
+    } catch (AmbariException ambariException) {
+      LOG.error("Unable to lookup hosts for cluster named {}", clusterName,
+          ambariException);
+
+      return affectedHosts;
     }
     }
 
 
     // intercept host agent alerts; they affect all hosts
     // intercept host agent alerts; they affect all hosts
     if (Services.AMBARI.equals(definitionServiceName)
     if (Services.AMBARI.equals(definitionServiceName)
         && Components.AMBARI_AGENT.equals(definitionComponentName)) {
         && Components.AMBARI_AGENT.equals(definitionComponentName)) {
-
-      invalidateAll();
-      invalidatedHosts.addAll(hosts.keySet());
-      return invalidatedHosts;
+      affectedHosts.addAll(hosts.keySet());
+      return affectedHosts;
     }
     }
 
 
     // find all hosts that have the matching service and component
     // find all hosts that have the matching service and component
@@ -354,8 +390,7 @@ public class AlertDefinitionHash {
         String componentName = component.getServiceComponentName();
         String componentName = component.getServiceComponentName();
         if (serviceName.equals(definitionServiceName)
         if (serviceName.equals(definitionServiceName)
             && componentName.equals(definitionComponentName)) {
             && componentName.equals(definitionComponentName)) {
-          invalidate(clusterName, hostName);
-          invalidatedHosts.add(hostName);
+          affectedHosts.add(hostName);
         }
         }
       }
       }
     }
     }
@@ -367,7 +402,7 @@ public class AlertDefinitionHash {
       LOG.warn("The alert definition {} has an unknown service of {}",
       LOG.warn("The alert definition {} has an unknown service of {}",
           definitionName, definitionServiceName);
           definitionName, definitionServiceName);
 
 
-      return invalidatedHosts;
+      return affectedHosts;
     }
     }
 
 
     // get all master components of the definition's service; any hosts that
     // get all master components of the definition's service; any hosts that
@@ -379,15 +414,14 @@ public class AlertDefinitionHash {
           Map<String, ServiceComponentHost> componentHosts = component.getValue().getServiceComponentHosts();
           Map<String, ServiceComponentHost> componentHosts = component.getValue().getServiceComponentHosts();
           if (null != componentHosts) {
           if (null != componentHosts) {
             for (String componentHost : componentHosts.keySet()) {
             for (String componentHost : componentHosts.keySet()) {
-              invalidate(clusterName, componentHost);
-              invalidatedHosts.add(componentHost);
+              affectedHosts.add(componentHost);
             }
             }
           }
           }
         }
         }
       }
       }
     }
     }
 
 
-    return invalidatedHosts;
+    return affectedHosts;
   }
   }
 
 
   /**
   /**
@@ -462,6 +496,7 @@ public class AlertDefinitionHash {
       // before the next heartbeat, there would be several commands that would
       // before the next heartbeat, there would be several commands that would
       // force the agents to reschedule their alerts more than once
       // force the agents to reschedule their alerts more than once
       m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
       m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
+      m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_EXECUTION_COMMAND);
       m_actionQueue.enqueue(hostName, command);
       m_actionQueue.enqueue(hostName, command);
     }
     }
   }
   }

+ 2 - 1
ambari-server/src/main/resources/properties.json

@@ -447,7 +447,8 @@
       "AlertDefinition/interval",
       "AlertDefinition/interval",
       "AlertDefinition/enabled",
       "AlertDefinition/enabled",
       "AlertDefinition/scope",
       "AlertDefinition/scope",
-      "AlertDefinition/source"
+      "AlertDefinition/source",
+      "AlertDefinition/run_now"
     ],
     ],
     "AlertGroup": [
     "AlertGroup": [
       "AlertGroup/id",
       "AlertGroup/id",

+ 45 - 2
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertDefinitionHashTest.java

@@ -25,6 +25,7 @@ import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
@@ -32,11 +33,16 @@ import java.util.UUID;
 
 
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
+import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AlertDefinitionCommand;
+import org.apache.ambari.server.agent.AlertExecutionCommand;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponent;
@@ -344,6 +350,35 @@ public class AlertDefinitionHashTest extends TestCase {
     assertEquals(expected, m_hash.getHash(CLUSTERNAME, HOSTNAME));
     assertEquals(expected, m_hash.getHash(CLUSTERNAME, HOSTNAME));
   }
   }
 
 
+  @Test
+  public void testActionQueueInvalidation() throws Exception{
+    ActionQueue actionQueue = m_injector.getInstance(ActionQueue.class);
+
+    AlertDefinitionCommand definitionCommand1 = new AlertDefinitionCommand(
+        CLUSTERNAME, HOSTNAME, "12345", null);
+
+    AlertDefinitionCommand definitionCommand2 = new AlertDefinitionCommand(
+        CLUSTERNAME, "anotherHost", "67890", null);
+
+    AlertExecutionCommand executionCommand = new AlertExecutionCommand(
+        CLUSTERNAME, HOSTNAME, null);
+
+    actionQueue.enqueue(HOSTNAME, definitionCommand1);
+    actionQueue.enqueue(HOSTNAME, executionCommand);
+    actionQueue.enqueue("anotherHost", definitionCommand2);
+
+    assertEquals(2, actionQueue.size(HOSTNAME));
+    assertEquals(1, actionQueue.size("anotherHost"));
+
+    Set<String> hosts = new HashSet<String>();
+    hosts.add(HOSTNAME);
+
+    // should invalidate both alert commands, and add a new definition command
+    m_hash.enqueueAgentCommands(CLUSTERNAME, hosts);
+    assertEquals(1, actionQueue.size(HOSTNAME));
+    assertEquals(1, actionQueue.size("anotherHost"));
+  }
+
   /**
   /**
    *
    *
    */
    */
@@ -353,12 +388,20 @@ public class AlertDefinitionHashTest extends TestCase {
      */
      */
     @Override
     @Override
     public void configure(Binder binder) {
     public void configure(Binder binder) {
+      Cluster cluster = EasyMock.createNiceMock(Cluster.class);
+      EasyMock.expect(cluster.getAllConfigs()).andReturn(
+          new ArrayList<Config>()).anyTimes();
+
       binder.bind(Clusters.class).toInstance(
       binder.bind(Clusters.class).toInstance(
           EasyMock.createNiceMock(Clusters.class));
           EasyMock.createNiceMock(Clusters.class));
-      binder.bind(Cluster.class).toInstance(
-          EasyMock.createNiceMock(Cluster.class));
+
+      binder.bind(Cluster.class).toInstance(cluster);
+
       binder.bind(AlertDefinitionDAO.class).toInstance(
       binder.bind(AlertDefinitionDAO.class).toInstance(
           EasyMock.createNiceMock(AlertDefinitionDAO.class));
           EasyMock.createNiceMock(AlertDefinitionDAO.class));
+
+      binder.bind(ConfigHelper.class).toInstance(
+          EasyMock.createNiceMock(ConfigHelper.class));
     }
     }
   }
   }
 }
 }

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostTest.java

@@ -125,7 +125,7 @@ public class ServiceComponentHostTest {
       String svcComponent,
       String svcComponent,
       String hostName, boolean isClient) throws AmbariException{
       String hostName, boolean isClient) throws AmbariException{
     Cluster c = clusters.getCluster("C1");
     Cluster c = clusters.getCluster("C1");
-
+    Assert.assertNotNull(c.getConfigGroups());
     return createNewServiceComponentHost(c, svc, svcComponent, hostName);
     return createNewServiceComponentHost(c, svc, svcComponent, hostName);
   }
   }