Sfoglia il codice sorgente

AMBARI-9807 - Store Configuration In Agent Memory For Alerts (jonathanhurley)

Jonathan Hurley 10 anni fa
parent
commit
f5f13b3d4c

+ 35 - 131
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -26,7 +26,6 @@ import logging
 import os
 import sys
 import time
-import traceback
 from apscheduler.scheduler import Scheduler
 from alerts.collector import AlertCollector
 from alerts.metric_alert import MetricAlert
@@ -38,8 +37,6 @@ logger = logging.getLogger()
 
 
 class AlertSchedulerHandler():
-  make_cachedir = True
-
   FILENAME = 'definitions.json'
   TYPE_PORT = 'PORT'
   TYPE_METRIC = 'METRIC'
@@ -53,13 +50,17 @@ class AlertSchedulerHandler():
   }
 
 
-  def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir, config, in_minutes=True):
+  def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir,
+      cluster_configuration, config, in_minutes=True):
+
     self.cachedir = cachedir
     self.stacks_dir = stacks_dir
     self.common_services_dir = common_services_dir
     self.host_scripts_dir = host_scripts_dir
+
+    self._cluster_configuration = cluster_configuration
     
-    if not os.path.exists(cachedir) and AlertSchedulerHandler.make_cachedir:
+    if not os.path.exists(cachedir):
       try:
         os.makedirs(cachedir)
       except:
@@ -69,64 +70,35 @@ class AlertSchedulerHandler():
     self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
     self.__in_minutes = in_minutes
     self.config = config
-    self.__config_maps = {}
-
-
-  def update_definitions(self, alert_commands, reschedule_jobs=False):
-    """ updates the persisted definitions and restarts the scheduler """
-    
-    with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
-      json.dump(alert_commands, f, indent=2)
-    
-    if reschedule_jobs:
-      self.reschedule()
 
 
-  def __update_definition_configs(self, newConfigurations, reschedule_jobs=False):
+  def update_definitions(self, heartbeat):
     """
-    Updates the definitions and configurations stored on disk. Optionally
-    can reschedule jobs. Job rescheduling is only necessary when data that
-    an existing job uses has changed. In many cases, configuration values
-    have changed, yet no jobs need rescheduling.
-
-    :param reschedule_jobs:
+    Updates the persisted alert definitions JSON.
+    :param heartbeat:
     :return:
     """
-
-    if reschedule_jobs:
-      logger.info("[AlertScheduler] Updating {0} with the latest configuration values and rescheduling alert jobs".format(self.FILENAME))
-    else:
-      logger.info("[AlertScheduler] Updating {0} with the latest configuration values".format(self.FILENAME))
-
-    # Load definitions from json
-    try:
-      with open(os.path.join(self.cachedir, self.FILENAME), 'r') as fp:
-        all_commands = json.load(fp)
-    except IOError, ValueError:
-      if logger.isEnabledFor(logging.DEBUG):
-        logger.exception("[AlertScheduler] Failed to load definitions. {0}".format(traceback.format_exc()))
+    if 'alertDefinitionCommands' not in heartbeat:
+      logger.warning("There are no alert definition commands in the heartbeat; unable to update definitions")
       return
 
-    # Update definitions with current config
-    for command_json in all_commands:
-      if 'clusterName' in command_json:
-        clusterName = command_json['clusterName']
-      else:
-        clusterName = ''
+    # prune out things we don't want to store
+    alert_definitions = []
+    for command in heartbeat['alertDefinitionCommands']:
+      command_copy = command.copy()
 
-      self.__update_config_values(command_json['configurations'],
-        self.__config_maps[clusterName])
+      # no need to store these since we always use the in-memory cached values
+      if 'configurations' in command_copy:
+        del command_copy['configurations']
 
-      # update the configurations before writing the file back out
-      command_json['configurations'] = newConfigurations
+      alert_definitions.append(command_copy)
 
-    # Save definitions to file
+    # write out the new definitions
     with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
-      json.dump(all_commands, f, indent=2)
+      json.dump(alert_definitions, f, indent=2)
 
-    # only reschdule jobs if instructed to
-    if reschedule_jobs:
-      self.reschedule_all()
+    # reschedule only the jobs that have changed
+    self.reschedule()
 
 
   def __make_function(self, alert_def):
@@ -206,6 +178,7 @@ class AlertSchedulerHandler():
     logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
         str(jobs_scheduled), str(jobs_removed)))
 
+
   def reschedule_all(self):
     """
     Removes jobs that are scheduled where their UUID no longer is valid.
@@ -239,7 +212,11 @@ class AlertSchedulerHandler():
   
 
   def __load_definitions(self):
-    """ loads all alert commands from the file.  all clusters are stored in one file """
+    """
+    Loads all alert definitions from a file. All clusters are stored in
+    a single file.
+    :return:
+    """
     definitions = []
     
     all_commands = None
@@ -248,31 +225,20 @@ class AlertSchedulerHandler():
       with open(alerts_definitions_path) as fp:
         all_commands = json.load(fp)
     except:
-      logger.warning('[AlertScheduler] {0} not found. No alerts will be scheduled.'.format(alerts_definitions_path))
+      logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path))
       return definitions
     
     for command_json in all_commands:
       clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
       hostName = '' if not 'hostName' in command_json else command_json['hostName']
 
-      configmap = None
-      # each cluster gets a map of key/value pairs of substitution values
-      self.__config_maps[clusterName] = {} 
-      if 'configurations' in command_json:
-        configmap = command_json['configurations']
-
       for definition in command_json['alertDefinitions']:
         alert = self.__json_to_callable(clusterName, hostName, definition)
         
         if alert is None:
           continue
           
-        # get the config values for the alerts 'lookup keys',
-        # eg: hdfs-site/dfs.namenode.http-address : host_and_port        
-        vals = self.__find_config_values(configmap, alert.get_lookup_keys())
-        self.__config_maps[clusterName].update(vals)
-
-        alert.set_helpers(self._collector, self.__config_maps[clusterName])
+        alert.set_helpers(self._collector, self._cluster_configuration)
 
         definitions.append(alert)
       
@@ -310,69 +276,6 @@ class AlertSchedulerHandler():
     return alert
 
 
-  def __find_config_values(self, configmap, obj_keylist):
-    """ 
-    finds templated values in the configuration map provided by the server
-    and returns a dictionary of template key to value 
-    """
-    
-    if configmap is None:
-      return {}
-    
-    result = {}
-    
-    for key in obj_keylist:
-      try:
-        obj = configmap
-        for layer in key.split('/'):
-          obj = obj[layer]
-        result[key] = obj
-      except KeyError: # the nested key is missing somewhere
-        pass
-        
-    return result
-
-  def __update_config_values(self, configs, actual_configs):
-    for slashkey in actual_configs.keys():
-      dicts = slashkey.split('/')
-      current_dict = configs
-      for i in range(len(dicts)):
-        if i+1 >= len(dicts):
-          current_dict[dicts[i]] = actual_configs[slashkey]
-        else:
-          if not dicts[i] in current_dict:
-            current_dict[dicts[i]]={}
-          current_dict = current_dict[dicts[i]]
-
-  def update_configurations(self, commands):
-    """
-    Checks the execution command's configurations against those stored in
-    memory. If there are differences, this will reschedule alerts. The
-    on-disk JSON file is always updated so that it reflects the correct state
-    of configurations
-    """
-    for command in commands:
-      clusterName = command['clusterName']
-      if not clusterName in self.__config_maps:
-        continue
-
-      if not 'configurations' in command:
-        continue
-
-      existingConfigurationKeys = self.__config_maps[clusterName].keys()
-      newConfigurations = command['configurations']
-      newConfigurationValues = self.__find_config_values(newConfigurations,
-        existingConfigurationKeys)
-
-      # if we have updated values push them to config_maps and reschedule
-      rescheduleJobs = False
-      if newConfigurationValues != self.__config_maps[clusterName]:
-        rescheduleJobs = True
-        self.__config_maps[clusterName].update(newConfigurationValues)
-
-      self.__update_definition_configs(newConfigurations, rescheduleJobs)
-        
-
   def schedule_definition(self,definition):
     """
     Schedule a definition (callable). Scheduled jobs are given the UUID
@@ -382,7 +285,7 @@ class AlertSchedulerHandler():
     simply NOOP.
     """
     # NOOP if the definition is disabled; don't schedule it
-    if definition.is_enabled() == False:
+    if not definition.is_enabled():
       logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format(
           definition.get_name(),definition.get_uuid()))
       return
@@ -408,7 +311,7 @@ class AlertSchedulerHandler():
   def get_job_count(self):
     """
     Gets the number of jobs currently scheduled. This is mainly used for
-    test verification of scheduling
+    test verification of scheduling.
     """
     if self.__scheduler is None:
       return 0
@@ -440,11 +343,12 @@ class AlertSchedulerHandler():
         logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(),
             alert.get_uuid()))
         
-        alert.set_helpers(self._collector, self.__config_maps[clusterName])
+        alert.set_helpers(self._collector, self._cluster_configuration)
         alert.collect()
       except:
         logger.exception("[AlertScheduler] Unable to execute the alert outside of the job scheduler")
 
+
 def main():
   args = list(sys.argv)
   del args[0]

+ 164 - 0
ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py

@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import logging
+import json
+import os
+import threading
+
+logger = logging.getLogger()
+
+class ClusterConfiguration():
+  """
+  Maintains an in-memory cache and disk cache of the configurations for
+  every cluster. This is useful for having quick access to any of the
+  configuration properties.
+  """
+
+  FILENAME = 'configurations.json'
+
+  # constants that define which commands hold configurations that can be
+  # used to populate this cache
+  EXECUTION_COMMANDS = 'executionCommands'
+  ALERT_DEFINITION_COMMANDS = 'alertDefinitionCommands'
+  COMMANDS_WITH_CONFIGURATIONS = [EXECUTION_COMMANDS, ALERT_DEFINITION_COMMANDS]
+
+  def __init__(self, cluster_config_cache_dir):
+    """
+    Initializes the configuration cache.
+    :param cluster_config_cache_dir:
+    :return:
+    """
+    self.cluster_config_cache_dir = cluster_config_cache_dir
+
+    # keys are cluster names, values are configurations
+    self.__configurations = {}
+
+    self.__file_lock = threading.RLock()
+    self.__cache_lock = threading.RLock()
+    self.__config_json_file = os.path.join(self.cluster_config_cache_dir, self.FILENAME)
+
+    # ensure that our cache directory exists
+    if not os.path.exists(cluster_config_cache_dir):
+      try:
+        os.makedirs(cluster_config_cache_dir)
+      except:
+        logger.critical("Could not create the cluster configuration cache directory {0}".format(cluster_config_cache_dir))
+
+    # if the file exists, then load it
+    try:
+      if os.path.isfile(self.__config_json_file):
+        with open(self.__config_json_file, 'r') as fp:
+          self.__configurations = json.load(fp)
+    except Exception, exception:
+      logger.warning("Unable to load configurations from {0}. This file will be regenerated on registration".format(self.__config_json_file))
+
+
+  def update_configurations_from_heartbeat(self, heartbeat):
+    """
+    Updates the in-memory and disk-based cluster configurations based on
+    the heartbeat. This will only update configurations on the following
+    types of commands in the heartbeat: execution, and alert definition.
+    :param new_configurations:
+    :return:
+    """
+    heartbeat_keys = heartbeat.keys()
+
+    heartbeat_contains_configurations = False
+    for commandType in self.COMMANDS_WITH_CONFIGURATIONS:
+      if commandType in heartbeat_keys:
+        heartbeat_contains_configurations = True
+
+    # if this heartbeat doesn't contain a command with configurations, then
+    # don't process it
+    if not heartbeat_contains_configurations:
+      return
+
+    if self.EXECUTION_COMMANDS in heartbeat_keys:
+      execution_commands = heartbeat[self.EXECUTION_COMMANDS]
+      for command in execution_commands:
+        if 'clusterName' in command and 'configurations' in command:
+          cluster_name = command['clusterName']
+          configurations = command['configurations']
+          self._update_configurations(cluster_name, configurations)
+
+      return
+
+    if self.ALERT_DEFINITION_COMMANDS in heartbeat_keys:
+      alert_definition_commands = heartbeat[self.ALERT_DEFINITION_COMMANDS]
+      for command in alert_definition_commands:
+        if 'clusterName' in command and 'configurations' in command:
+          cluster_name = command['clusterName']
+          configurations = command['configurations']
+          self._update_configurations(cluster_name, configurations)
+
+      return
+
+
+  def _update_configurations(self, cluster_name, configuration):
+    """
+    Thread-safe method for writing out the specified cluster configuration
+    and updating the in-memory representation.
+    :param cluster_name:
+    :param configuration:
+    :return:
+    """
+    logger.info("Updating cached configurations for cluster {0}".format(cluster_name))
+
+    self.__cache_lock.acquire()
+    try:
+      self.__configurations[cluster_name] = configuration
+    except Exception, exception :
+      logger.exception("Unable to update configurations for cluster {0}".format(cluster_name))
+    finally:
+      self.__cache_lock.release()
+
+
+    self.__file_lock.acquire()
+    try:
+      with open(self.__config_json_file, 'w') as f:
+        json.dump(self.__configurations, f, indent=2)
+    except Exception, exception :
+      logger.exception("Unable to update configurations for cluster {0}".format(cluster_name))
+    finally:
+      self.__file_lock.release()
+
+
+  def get_configuration_value(self, cluster_name, key):
+    """
+    Gets a value from the cluster configuration map for the given cluster and
+    key. The key is expected to be of the form 'foo-bar/baz' or
+    'foo-bar/bar-baz/foobarbaz' where every / denotes a new mapping
+    :param key:  a lookup key, like 'foo-bar/baz'
+    :return: the value, or None if not found
+    """
+    self.__cache_lock.acquire()
+    try:
+      dictionary = self.__configurations[cluster_name]
+      for layer_key in key.split('/'):
+        dictionary = dictionary[layer_key]
+
+      return dictionary
+
+    except Exception:
+      logger.debug("Cache miss for configuration property {0} in cluster {1}".format(key, cluster_name))
+      return None
+    finally:
+      self.__cache_lock.release()

+ 17 - 9
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -35,6 +35,7 @@ import hostname
 import security
 import ssl
 import AmbariConfig
+
 from Heartbeat import Heartbeat
 from Register import Register
 from ActionQueue import ActionQueue
@@ -42,6 +43,7 @@ from FileCache import FileCache
 from NetUtil import NetUtil
 from LiveStatus import LiveStatus
 from AlertSchedulerHandler import AlertSchedulerHandler
+from ClusterConfiguration import  ClusterConfiguration
 from HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
 
 logger = logging.getLogger()
@@ -89,9 +91,13 @@ class Controller(threading.Thread):
     common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
     host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
     alerts_cache_dir = os.path.join(cache_dir, 'alerts')
-    
+    cluster_config_cache_dir = os.path.join(cache_dir, 'cluster_configuration')
+
+    self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir)
+
     self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir, 
-        stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir, config)
+      stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir,
+      self.cluster_configuration, config)
 
 
   def __del__(self):
@@ -149,12 +155,11 @@ class Controller(threading.Thread):
         else:
           self.hasMappedComponents = False
 
-        if 'alertDefinitionCommands' in ret.keys():
-          logger.info("Got alert definition update on registration " + pprint.pformat(ret['alertDefinitionCommands']))
-          self.alert_scheduler_handler.update_definitions(ret['alertDefinitionCommands'])
-          pass
+        # always update cached cluster configurations on registration
+        self.cluster_configuration.update_configurations_from_heartbeat(ret)
 
-        pass
+        # always update alert definitions on registration
+        self.alert_scheduler_handler.update_definitions(ret)
       except ssl.SSLError:
         self.repeatRegistration = False
         self.isRegistered = False
@@ -258,6 +263,10 @@ class Controller(threading.Thread):
         else:
           self.responseId = serverId
 
+        # if the response contains configurations, update the in-memory and
+        # disk-based configuration cache (execution and alert commands have this)
+        self.cluster_configuration.update_configurations_from_heartbeat(response)
+
         response_keys = response.keys()
         if 'cancelCommands' in response_keys:
           self.cancelCommandInQueue(response['cancelCommands'])
@@ -265,13 +274,12 @@ class Controller(threading.Thread):
         if 'executionCommands' in response_keys:
           execution_commands = response['executionCommands']
           self.addToQueue(execution_commands)
-          self.alert_scheduler_handler.update_configurations(execution_commands)
 
         if 'statusCommands' in response_keys:
           self.addToStatusQueue(response['statusCommands'])
 
         if 'alertDefinitionCommands' in response_keys:
-          self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
+          self.alert_scheduler_handler.update_definitions(response)
 
         if 'alertExecutionCommands' in response_keys:
           self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])

+ 53 - 63
ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py

@@ -35,7 +35,7 @@ class BaseAlert(object):
   def __init__(self, alert_meta, alert_source_meta):
     self.alert_meta = alert_meta
     self.alert_source_meta = alert_source_meta
-    self.cluster = ''
+    self.cluster_name = ''
     self.host_name = ''
     self._lookup_keys = []
     
@@ -70,16 +70,28 @@ class BaseAlert(object):
     return self.alert_meta['uuid']
 
 
-  def set_helpers(self, collector, value_dict):
-    """ sets helper objects for alerts without having to use them in a constructor """
+  def set_helpers(self, collector, cluster_configuration):
+    """
+    sets helper objects for alerts without having to use them in a constructor
+    """
     self.collector = collector
-    self.config_value_dict = value_dict
+    self.cluster_configuration = cluster_configuration
 
 
-  def set_cluster(self, cluster, host):
+  def set_cluster(self, cluster_name, host_name):
     """ sets cluster information for the alert """
-    self.cluster = cluster
-    self.host_name = host
+    self.cluster_name = cluster_name
+    self.host_name = host_name
+
+
+  def _get_alert_meta_value_safely(self, meta_key):
+    """
+    safe way to get a value when outputting result json.  will not throw an exception
+    """
+    if self.alert_meta.has_key(meta_key):
+      return self.alert_meta[meta_key]
+    else:
+      return None
 
 
   def collect(self):
@@ -129,73 +141,51 @@ class BaseAlert(object):
       logger.debug("[Alert][{0}] result = {1}".format(self.get_name(), str(res)))
       
     data = {}
-    data['name'] = self._find_value('name')
-    data['label'] = self._find_value('label')
+    data['name'] = self._get_alert_meta_value_safely('name')
+    data['label'] = self._get_alert_meta_value_safely('label')
     data['state'] = res[0]
     data['text'] = res_base_text.format(*res[1])
-    data['cluster'] = self.cluster
-    data['service'] = self._find_value('serviceName')
-    data['component'] = self._find_value('componentName')
+    data['cluster'] = self.cluster_name
+    data['service'] = self._get_alert_meta_value_safely('serviceName')
+    data['component'] = self._get_alert_meta_value_safely('componentName')
     data['timestamp'] = long(time.time() * 1000)
-    data['uuid'] = self._find_value('uuid')
-    data['enabled'] = self._find_value('enabled')
+    data['uuid'] = self._get_alert_meta_value_safely('uuid')
+    data['enabled'] = self._get_alert_meta_value_safely('enabled')
 
     if logger.isEnabledFor(logging.DEBUG):
       logger.debug("[Alert][{0}] text = {1}".format(self.get_name(), data['text']))
     
-    self.collector.put(self.cluster, data)
-
-
-  def _find_value(self, meta_key):
-    """ safe way to get a value when outputting result json.  will not throw an exception """
-    if self.alert_meta.has_key(meta_key):
-      return self.alert_meta[meta_key]
-    else:
-      return None
-
-
-  def get_lookup_keys(self):
-    """ returns a list of lookup keys found for this alert """
-    return self._lookup_keys
+    self.collector.put(self.cluster_name, data)
 
 
-  def _find_lookup_property(self, key):
+  def _get_configuration_value(self, key):
     """
-    check if the supplied key is parameterized and appends the extracted key
-    to the array of keys
+    Gets the value of the specified configuration key from the cache. The key
+    should be of the form {{foo-bar/baz}}. If the key is not a lookup key
+    and is instead a constant, such as "foo" or "5", then the constant is
+    returned.
+    :return:
     """
-    keys = re.findall("{{([\S]+)}}", key)
-    
-    if len(keys) > 0:
-      if logger.isEnabledFor(logging.DEBUG):
-        logger.debug("[Alert][{0}] Found parameterized key {1} for {2}".format(
-          self.get_name(), str(keys), str(self)))
-
-      self._lookup_keys.append(keys[0])
-      return keys[0]
-      
-    return key
-
+    # parse {{foo-bar/baz}}
+    placeholder_keys = re.findall("{{([\S]+)}}", key)
 
-  def _lookup_property_value(self, key):
-    """
-    in the case of specifying a configuration path, lookup that path's value
-    """
-    if not key in self._lookup_keys:
+    # if none found, then return the original
+    if len(placeholder_keys) == 0:
       return key
 
-    if key in self.config_value_dict:
-      return self.config_value_dict[key]
-    else:
-      return None
+    # this is a lookup key, so transform it into a value from the config cache
+    placeholder_key = placeholder_keys[0]
+
+    return self.cluster_configuration.get_configuration_value(
+      self.cluster_name, placeholder_key)
 
     
   def _lookup_uri_property_keys(self, uri_structure):
     """
     Loads the configuration lookup keys that the URI structure needs. This
     will return a named tuple that contains the keys needed to lookup
-    parameterized URI values from the URI structure. The URI structure looks 
-    something like:
+    parameterized URI values from the cached configuration.
+    The URI structure looks something like:
     
     "uri":{ 
       "http": foo,
@@ -216,13 +206,13 @@ class BaseAlert(object):
     kerberos_principal = None
     
     if 'http' in uri_structure:
-      http_key = self._find_lookup_property(uri_structure['http'])
+      http_key = uri_structure['http']
     
     if 'https' in uri_structure:
-      https_key = self._find_lookup_property(uri_structure['https'])
+      https_key = uri_structure['https']
       
     if 'https_property' in uri_structure:
-      https_property_key = self._find_lookup_property(uri_structure['https_property'])
+      https_property_key = uri_structure['https_property']
       
     if 'https_property_value' in uri_structure:
       https_property_value_key = uri_structure['https_property_value']
@@ -231,10 +221,10 @@ class BaseAlert(object):
       default_port = uri_structure['default_port']
 
     if 'kerberos_keytab' in uri_structure:
-      kerberos_keytab = self._find_lookup_property(uri_structure['kerberos_keytab'])
+      kerberos_keytab = uri_structure['kerberos_keytab']
 
     if 'kerberos_principal' in uri_structure:
-      kerberos_principal = self._find_lookup_property(uri_structure['kerberos_principal'])
+      kerberos_principal = uri_structure['kerberos_principal']
 
     AlertUriLookupKeys = namedtuple('AlertUriLookupKeys', 
         'http https https_property https_property_value default_port kerberos_keytab kerberos_principal')
@@ -274,16 +264,16 @@ class BaseAlert(object):
     # attempt to parse and parameterize the various URIs; properties that
     # do not exist int he lookup map are returned as None
     if alert_uri_lookup_keys.http is not None:
-      http_uri = self._lookup_property_value(alert_uri_lookup_keys.http)
+      http_uri = self._get_configuration_value(alert_uri_lookup_keys.http)
     
     if alert_uri_lookup_keys.https is not None:
-      https_uri = self._lookup_property_value(alert_uri_lookup_keys.https)
+      https_uri = self._get_configuration_value(alert_uri_lookup_keys.https)
 
     if alert_uri_lookup_keys.https_property is not None:
-      https_property = self._lookup_property_value(alert_uri_lookup_keys.https_property)
+      https_property = self._get_configuration_value(alert_uri_lookup_keys.https_property)
 
     if alert_uri_lookup_keys.https_property_value is not None:
-      https_property_value = self._lookup_property_value(alert_uri_lookup_keys.https_property_value)
+      https_property_value = self._get_configuration_value(alert_uri_lookup_keys.https_property_value)
 
     # without a URI, there's no way to create the structure we need - return
     # the default port if specified, otherwise throw an exception

+ 6 - 4
ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py

@@ -41,9 +41,8 @@ class PortAlert(BaseAlert):
     self.warning_timeout = DEFAULT_WARNING_TIMEOUT
     self.critical_timeout = DEFAULT_CRITICAL_TIMEOUT
 
-    # can be parameterized or static
     if 'uri' in alert_source_meta:
-      self.uri = self._find_lookup_property(alert_source_meta['uri'])
+      self.uri = alert_source_meta['uri']
 
     # always static
     if 'default_port' in alert_source_meta:
@@ -80,8 +79,10 @@ class PortAlert(BaseAlert):
 
 
   def _collect(self):
+    # can be parameterized or static
     # if not parameterized, this will return the static value
-    uri_value = self._lookup_property_value(self.uri)
+    uri_value = self._get_configuration_value(self.uri)
+
     if uri_value is None:
       uri_value = self.host_name
       logger.debug("[Alert][{0}] Setting the URI to this host since it wasn't specified".format(
@@ -128,7 +129,8 @@ class PortAlert(BaseAlert):
 
       start_time = time.time()
       s.connect((host, port))
-      milliseconds = time.time() - start_time
+      end_time = time.time()
+      milliseconds = end_time - start_time
       seconds = milliseconds / 1000.0
 
       # not sure why this happens sometimes, but we don't always get a

+ 17 - 24
ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py

@@ -24,7 +24,6 @@ import os
 import re
 from alerts.base_alert import BaseAlert
 from resource_management.core.environment import Environment
-from symbol import parameters
 
 logger = logging.getLogger()
 
@@ -59,32 +58,26 @@ class ScriptAlert(BaseAlert):
 
     if 'host_scripts_directory' in alert_source_meta:
       self.host_scripts_dir = alert_source_meta['host_scripts_directory']
-      
-    # execute the get_tokens() method so that this script correctly populates
-    # its list of keys
-    try:
-      cmd_module = self._load_source()
-      tokens = cmd_module.get_tokens()
-        
-      # for every token, populate the array keys that this alert will need
-      if tokens is not None:
-        for token in tokens:
-          # append the key to the list of keys for this alert
-          self._find_lookup_property(token)
-    except:
-      logger.exception("[Alert][{0}] Unable to parameterize tokens for script {1}".format(
-        self.get_name(), self.path))
-              
-    
+
   def _collect(self):
     cmd_module = self._load_source()
+
     if cmd_module is not None:
-      # convert the dictionary from 
-      # {'foo-site/bar': 'baz'} into 
-      # {'{{foo-site/bar}}': 'baz'}
       parameters = {}
-      for key in self.config_value_dict:
-        parameters['{{' + key + '}}'] = self.config_value_dict[key]
+
+      try:
+        tokens = cmd_module.get_tokens()
+        if tokens is not None:
+          # for each token, if there is a value, store in; otherwise don't store
+          # a key with a value of None
+          for token in tokens:
+            value = self._get_configuration_value(token)
+            if value is not None:
+              parameters[token] = value
+      except AttributeError:
+        # it's OK if the module doesn't have get_tokens() ; no tokens will
+        # be passed in so hopefully the script doesn't need any
+        logger.debug("The script {0} does not have a get_tokens() function".format(str(cmd_module)))
 
       # try to get basedir for scripts
       # it's needed for server side scripts to properly use resource management
@@ -135,7 +128,7 @@ class ScriptAlert(BaseAlert):
 
       return None
 
-    return imp.load_source(self._find_value('name'), self.path_to_script)
+    return imp.load_source(self._get_alert_meta_value_safely('name'), self.path_to_script)
 
 
   def _get_reporting_text(self, state):

+ 2 - 2
ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py

@@ -120,7 +120,7 @@ class WebAlert(BaseAlert):
       kerberos_principal = None
 
       if self.uri_property_keys.kerberos_principal is not None:
-        kerberos_principal = self._lookup_property_value(
+        kerberos_principal = self._get_configuration_value(
           self.uri_property_keys.kerberos_principal)
 
         if kerberos_principal is not None:
@@ -128,7 +128,7 @@ class WebAlert(BaseAlert):
           kerberos_principal = kerberos_principal.replace('_HOST', self.host_name)
 
       if self.uri_property_keys.kerberos_keytab is not None:
-        kerberos_keytab = self._lookup_property_value(self.uri_property_keys.kerberos_keytab)
+        kerberos_keytab = self._get_configuration_value(self.uri_property_keys.kerberos_keytab)
 
       if kerberos_principal is not None and kerberos_keytab is not None:
         os.system("kinit -kt {0} {1} > /dev/null".format(kerberos_keytab, kerberos_principal))

+ 312 - 177
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -21,8 +21,6 @@ limitations under the License.
 import os
 import socket
 import sys
-import re
-import json
 
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.alerts.collector import AlertCollector
@@ -31,6 +29,7 @@ from ambari_agent.alerts.port_alert import PortAlert
 from ambari_agent.alerts.script_alert import ScriptAlert
 from ambari_agent.alerts.web_alert import WebAlert
 from ambari_agent.apscheduler.scheduler import Scheduler
+from ambari_agent.ClusterConfiguration import ClusterConfiguration
 
 from collections import namedtuple
 from mock.mock import MagicMock, patch
@@ -42,7 +41,6 @@ class TestAlerts(TestCase):
     # save original open() method for later use
     self.original_open = open
 
-
   def tearDown(self):
     sys.stdout == sys.__stdout__
 
@@ -55,7 +53,12 @@ class TestAlerts(TestCase):
     test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
     test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
 
-    ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None)
+    cluster_configuration = self.__get_cluster_configuration()
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path,
+      test_common_services_path, test_host_scripts_path, cluster_configuration,
+      None)
+
     ash.start()
 
     self.assertTrue(aps_add_interval_job_mock.called)
@@ -64,13 +67,7 @@ class TestAlerts(TestCase):
   @patch('time.time')
   @patch.object(socket.socket,"connect")
   def test_port_alert(self, socket_connect_mock, time_mock):
-    # called 3x with 3 calls per alert
-    # - 900ms and then a time.time() for the date from base_alert
-    # - 2000ms and then a time.time() for the date from base_alert
-    # - socket.timeout to simulate a timeout and then a time.time() for the date from base_alert
-    time_mock.side_effect = [0,900,336283200000,0,2000,336283200000,socket.timeout,336283200000]
-
-    json = { "name": "namenode_process",
+    definition_json = { "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
       "label": "NameNode process",
@@ -98,26 +95,39 @@ class TestAlerts(TestCase):
       }
     }
 
+    configuration = { 'hdfs-site' : { 'my-key': 'value1' } }
+
     collector = AlertCollector()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
+    # called 3x with 3 calls per alert
+    # - 900ms and then a time.time() for the date from base_alert
+    # - 2000ms and then a time.time() for the date from base_alert
+    # - socket.timeout to simulate a timeout and then a time.time() for the date from base_alert
+    time_mock.side_effect = [0,900,336283000000,
+      0,2000,336283100000,
+      socket.timeout,336283200000]
 
-    pa = PortAlert(json, json['source'])
-    pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'})
-    self.assertEquals(6, pa.interval())
+    alert = PortAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    self.assertEquals(6, alert.interval())
 
     # 900ms is OK
-    pa.collect()
+    alert.collect()
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
     self.assertEquals('OK', alerts[0]['state'])
 
     # 2000ms is WARNING
-    pa.collect()
+    alert.collect()
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
     self.assertEquals('WARNING', alerts[0]['state'])
 
     # throws a socket.timeout exception, causes a CRITICAL
-    pa.collect()
+    alert.collect()
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
     self.assertEquals('CRITICAL', alerts[0]['state'])
@@ -125,7 +135,7 @@ class TestAlerts(TestCase):
 
   @patch.object(socket.socket,"connect")
   def test_port_alert_complex_uri(self, socket_connect_mock):
-    json = { "name": "namenode_process",
+    definition_json = { "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
       "label": "NameNode process",
@@ -148,16 +158,24 @@ class TestAlerts(TestCase):
       }
     }
 
+    configuration = {'hdfs-site' :
+      { 'my-key': 'c6401.ambari.apache.org:2181,c6402.ambari.apache.org:2181,c6403.ambari.apache.org:2181'}
+    }
+
     collector = AlertCollector()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
 
-    pa = PortAlert(json, json['source'])
+    alert = PortAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6402.ambari.apache.org")
 
     # use a URI that has commas to verify that we properly parse it
-    pa.set_helpers(collector, {'hdfs-site/my-key': 'c6401.ambari.apache.org:2181,c6402.ambari.apache.org:2181,c6403.ambari.apache.org:2181'})
-    pa.host_name = 'c6402.ambari.apache.org'
-    self.assertEquals(6, pa.interval())
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    self.assertEquals(6, alert.interval())
 
-    pa.collect()
+    alert.collect()
     
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
@@ -168,7 +186,7 @@ class TestAlerts(TestCase):
 
 
   def test_port_alert_no_sub(self):
-    json = { "name": "namenode_process",
+    definition_json = { "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
       "label": "NameNode process",
@@ -191,16 +209,19 @@ class TestAlerts(TestCase):
       }
     }
 
-    pa = PortAlert(json, json['source'])
-    pa.set_helpers(AlertCollector(), '')
-    self.assertEquals('http://c6401.ambari.apache.org', pa.uri)
+    cluster_configuration = self.__get_cluster_configuration()
+
+    alert = PortAlert(definition_json, definition_json['source'])
+    alert.set_helpers(AlertCollector(), cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
 
-    pa.collect()
+    self.assertEquals('http://c6401.ambari.apache.org', alert.uri)
+
+    alert.collect()
 
 
-  @patch.object(re, 'match', new = MagicMock())
   def test_script_alert(self):
-    json = {
+    definition_json = {
       "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
@@ -216,19 +237,28 @@ class TestAlerts(TestCase):
     }
 
     # normally set by AlertSchedulerHandler
-    json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files')
-    json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services')
-    json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts')
+    definition_json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files')
+    definition_json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services')
+    definition_json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts')
+
+    configuration = {'foo-site' :
+      { 'bar': 'rendered-bar', 'baz' : 'rendered-baz' }
+    }
 
     collector = AlertCollector()
-    sa = ScriptAlert(json, json['source'], MagicMock())
-    sa.set_helpers(collector, {'foo-site/bar': 'rendered-bar', 'foo-site/baz':'rendered-baz'} )
-    self.assertEquals(json['source']['path'], sa.path)
-    self.assertEquals(json['source']['stacks_directory'], sa.stacks_dir)
-    self.assertEquals(json['source']['common_services_directory'], sa.common_services_dir)
-    self.assertEquals(json['source']['host_scripts_directory'], sa.host_scripts_dir)
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
+    alert = ScriptAlert(definition_json, definition_json['source'], MagicMock())
+    alert.set_helpers(collector, cluster_configuration )
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    
+    self.assertEquals(definition_json['source']['path'], alert.path)
+    self.assertEquals(definition_json['source']['stacks_directory'], alert.stacks_dir)
+    self.assertEquals(definition_json['source']['common_services_directory'], alert.common_services_dir)
+    self.assertEquals(definition_json['source']['host_scripts_directory'], alert.host_scripts_dir)
 
-    sa.collect()
+    alert.collect()
 
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
@@ -239,7 +269,7 @@ class TestAlerts(TestCase):
 
   @patch.object(MetricAlert, "_load_jmx")
   def test_metric_alert(self, ma_load_jmx_mock):
-    json = {
+    definition_json = {
       "name": "cpu_check",
       "service": "HDFS",
       "component": "NAMENODE",
@@ -278,10 +308,19 @@ class TestAlerts(TestCase):
 
     ma_load_jmx_mock.return_value = [1, 3]
 
+    configuration = {'hdfs-site' :
+      { 'dfs.datanode.http.address': '1.2.3.4:80'}
+    }
+
     collector = AlertCollector()
-    ma = MetricAlert(json, json['source'])
-    ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
-    ma.collect()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
+    alert = MetricAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+
+    alert.collect()
     
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
@@ -289,11 +328,14 @@ class TestAlerts(TestCase):
     self.assertEquals('CRITICAL', alerts[0]['state'])
     self.assertEquals('(Unit Tests) crit_arr: 1 3 223', alerts[0]['text'])
 
-    del json['source']['jmx']['value']
+    del definition_json['source']['jmx']['value']
     collector = AlertCollector()
-    ma = MetricAlert(json, json['source'])
-    ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
-    ma.collect()
+
+    alert = MetricAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+
+    alert.collect()
 
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
@@ -304,7 +346,7 @@ class TestAlerts(TestCase):
 
   @patch.object(MetricAlert, "_load_jmx")
   def test_alert_uri_structure(self, ma_load_jmx_mock):
-    json = {
+    definition_json = {
       "name": "cpu_check",
       "service": "HDFS",
       "component": "NAMENODE",
@@ -349,52 +391,81 @@ class TestAlerts(TestCase):
     # run the alert without specifying any keys; an exception should be thrown
     # indicating that there was no URI and the result is UNKNOWN
     collector = AlertCollector()
-    ma = MetricAlert(json, json['source'])
-    ma.set_helpers(collector, '')
-    ma.collect()
+    cluster_configuration = self.__get_cluster_configuration()
+    alert = MetricAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    alert.collect()
 
     self.assertEquals('UNKNOWN', collector.alerts()[0]['state'])
 
-    # set 2 properties that make no sense wihtout the main URI properties 
+    # set properties that make no sense wihtout the main URI properties
+    configuration = {'hdfs-site' :
+      { 'dfs.http.policy' : 'HTTP_ONLY'}
+    }
+
     collector = AlertCollector()
-    ma = MetricAlert(json, json['source'])
-    ma.set_helpers(collector, {'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
-    ma.collect()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
+    alert = MetricAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    alert.collect()
     
     self.assertEquals('UNKNOWN', collector.alerts()[0]['state'])
     
     # set an actual property key (http)
+    configuration = {'hdfs-site' :
+      { 'dfs.http.policy' : 'HTTP_ONLY', 'dfs.datanode.http.address' : '1.2.3.4:80' }
+    }
+
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
     collector = AlertCollector()
-    ma = MetricAlert(json, json['source'])
-    ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80', 
-        'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
-    ma.collect()
+    alert = MetricAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    alert.collect()
     
     self.assertEquals('OK', collector.alerts()[0]['state'])
     
     # set an actual property key (https)
+    configuration = {'hdfs-site' :
+      { 'dfs.http.policy' : 'HTTP_ONLY', 'dfs.datanode.https.address' : '1.2.3.4:443' }
+    }
+
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
     collector = AlertCollector()
-    ma = MetricAlert(json, json['source'])
-    ma.set_helpers(collector, {'hdfs-site/dfs.datanode.https.address': '1.2.3.4:443', 
-        'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
-    ma.collect()
+    alert = MetricAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    alert.collect()
     
     self.assertEquals('OK', collector.alerts()[0]['state'])    
 
     # set both (http and https)
+    configuration = {'hdfs-site' :
+      { 'dfs.http.policy' : 'HTTP_ONLY',
+        'dfs.datanode.http.address' : '1.2.3.4:80',
+        'dfs.datanode.https.address' : '1.2.3.4:443' }
+    }
+
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
     collector = AlertCollector()
-    ma = MetricAlert(json, json['source'])
-    ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80', 
-        'hdfs-site/dfs.datanode.https.address': '1.2.3.4:443', 
-        'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
-    ma.collect()
+    alert = MetricAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    alert.collect()
     
     self.assertEquals('OK', collector.alerts()[0]['state'])    
 
 
   @patch.object(WebAlert, "_make_web_request")
   def test_web_alert(self, wa_make_web_request_mock):
-    json = {
+    definition_json = {
       "name": "webalert_test",
       "service": "HDFS",
       "component": "DATANODE",
@@ -429,9 +500,17 @@ class TestAlerts(TestCase):
     wa_make_web_request_mock.return_value = WebResponse(200,1.234,None)
 
     # run the alert and check HTTP 200    
+    configuration = {'hdfs-site' :
+      { 'dfs.datanode.http.address' : '1.2.3.4:80' }
+    }
+
     collector = AlertCollector()
-    alert = WebAlert(json, json['source'])
-    alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
+    alert = WebAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
     alert.collect()
 
     alerts = collector.alerts()
@@ -441,10 +520,13 @@ class TestAlerts(TestCase):
     self.assertEquals('OK', alerts[0]['state'])
 
     # run the alert and check HTTP 500
+
+
     wa_make_web_request_mock.return_value = WebResponse(500,1.234,None)
     collector = AlertCollector()
-    alert = WebAlert(json, json['source'])
-    alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
+    alert = WebAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
     alert.collect()
     
     alerts = collector.alerts()
@@ -457,8 +539,9 @@ class TestAlerts(TestCase):
     wa_make_web_request_mock.return_value = WebResponse(0,0,'error message')
      
     collector = AlertCollector()
-    alert = WebAlert(json, json['source'])
-    alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
+    alert = WebAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
     alert.collect()
     
     alerts = collector.alerts()
@@ -467,13 +550,19 @@ class TestAlerts(TestCase):
     # http assertion indicating that we properly determined non-SSL
     self.assertEquals('CRITICAL', alerts[0]['state'])
     self.assertEquals('(Unit Tests) critical: http://1.2.3.4:80. error message', alerts[0]['text'])
-     
+
+    configuration = {'hdfs-site' :
+      { 'dfs.http.policy' : 'HTTPS_ONLY',
+        'dfs.datanode.http.address' : '1.2.3.4:80',
+        'dfs.datanode.https.address' : '1.2.3.4:443' }
+    }
+
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
     collector = AlertCollector()
-    alert = WebAlert(json, json['source'])
-    alert.set_helpers(collector, {
-        'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80',
-        'hdfs-site/dfs.datanode.https.address': '1.2.3.4:8443',
-        'hdfs-site/dfs.http.policy': 'HTTPS_ONLY'})
+    alert = WebAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
 
     alert.collect()
     
@@ -482,15 +571,20 @@ class TestAlerts(TestCase):
     
     # SSL assertion
     self.assertEquals('CRITICAL', alerts[0]['state'])
-    self.assertEquals('(Unit Tests) critical: https://1.2.3.4:8443. error message', alerts[0]['text'])
+    self.assertEquals('(Unit Tests) critical: https://1.2.3.4:443. error message', alerts[0]['text'])
 
   def test_reschedule(self):
     test_file_path = os.path.join('ambari_agent', 'dummy_files')
     test_stack_path = os.path.join('ambari_agent', 'dummy_files')
     test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
     test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
-    
-    ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None)
+
+    cluster_configuration = self.__get_cluster_configuration()
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path,
+      test_common_services_path, test_host_scripts_path, cluster_configuration,
+      None)
+
     ash.start()
 
     self.assertEquals(1, ash.get_job_count())
@@ -499,7 +593,7 @@ class TestAlerts(TestCase):
 
 
   def test_alert_collector_purge(self):
-    json = { "name": "namenode_process",
+    definition_json = { "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
       "label": "NameNode process",
@@ -522,13 +616,20 @@ class TestAlerts(TestCase):
       }
     }
 
+    configuration = {'hdfs-site' :
+      { 'my-key': 'value1' }
+    }
+
     collector = AlertCollector()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
 
-    pa = PortAlert(json, json['source'])
-    pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'})
-    self.assertEquals(6, pa.interval())
+    alert = PortAlert(definition_json, definition_json['source'])
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    self.assertEquals(6, alert.interval())
 
-    res = pa.collect()
+    res = alert.collect()
 
     alerts = collector.alerts()
     self.assertEquals(0, len(collector.alerts()))
@@ -546,12 +647,17 @@ class TestAlerts(TestCase):
     test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
     test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
 
-    ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None)
+    cluster_configuration = self.__get_cluster_configuration()
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path,
+      test_common_services_path, test_host_scripts_path, cluster_configuration,
+      None)
+
     ash.start()
 
     self.assertEquals(1, ash.get_job_count())
 
-    json = { "name": "namenode_process",
+    definition_json = { "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
       "label": "NameNode process",
@@ -574,20 +680,20 @@ class TestAlerts(TestCase):
       }
     }
 
-    pa = PortAlert(json, json['source'])
-    ash.schedule_definition(pa)
+    alert = PortAlert(definition_json, definition_json['source'])
+    ash.schedule_definition(alert)
 
     self.assertEquals(2, ash.get_job_count())
 
-    json['enabled'] = False
-    pa = PortAlert(json, json['source'])
-    ash.schedule_definition(pa)
+    definition_json['enabled'] = False
+    alert = PortAlert(definition_json, definition_json['source'])
+    ash.schedule_definition(alert)
 
     # verify disabled alert not scheduled
     self.assertEquals(2, ash.get_job_count())
 
-    json['enabled'] = True
-    pa = PortAlert(json, json['source'])
+    definition_json['enabled'] = True
+    pa = PortAlert(definition_json, definition_json['source'])
     ash.schedule_definition(pa)
 
     # verify enabled alert was scheduled
@@ -599,9 +705,14 @@ class TestAlerts(TestCase):
     test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
     test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
 
-    ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None)
+    cluster_configuration = self.__get_cluster_configuration()
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path,
+      test_common_services_path, test_host_scripts_path, cluster_configuration,
+      None)
+
     ash.start()
 
+
     self.assertEquals(1, ash.get_job_count())
     self.assertEquals(0, len(ash._collector.alerts()))
 
@@ -639,7 +750,7 @@ class TestAlerts(TestCase):
 
 
   def test_skipped_alert(self):
-    json = {
+    definition_json = {
       "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
@@ -655,27 +766,35 @@ class TestAlerts(TestCase):
     }
 
     # normally set by AlertSchedulerHandler
-    json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files')
-    json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services')
-    json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts')
+    definition_json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files')
+    definition_json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services')
+    definition_json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts')
+
+    configuration = {'foo-site' :
+      { 'skip': 'true' }
+    }
 
     collector = AlertCollector()
-    sa = ScriptAlert(json, json['source'], None)
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
+
+    alert = ScriptAlert(definition_json, definition_json['source'], None)
 
     # instruct the test alert script to be skipped
-    sa.set_helpers(collector, {'foo-site/skip': 'true'} )
+    alert.set_helpers(collector, cluster_configuration )
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
 
-    self.assertEquals(json['source']['path'], sa.path)
-    self.assertEquals(json['source']['stacks_directory'], sa.stacks_dir)
-    self.assertEquals(json['source']['common_services_directory'], sa.common_services_dir)
-    self.assertEquals(json['source']['host_scripts_directory'], sa.host_scripts_dir)
+    self.assertEquals(definition_json['source']['path'], alert.path)
+    self.assertEquals(definition_json['source']['stacks_directory'], alert.stacks_dir)
+    self.assertEquals(definition_json['source']['common_services_directory'], alert.common_services_dir)
+    self.assertEquals(definition_json['source']['host_scripts_directory'], alert.host_scripts_dir)
 
     # ensure that it was skipped
     self.assertEquals(0,len(collector.alerts()))
 
 
   def test_default_reporting_text(self):
-    json = {
+    definition_json = {
       "name": "namenode_process",
       "service": "HDFS",
       "component": "NAMENODE",
@@ -690,100 +809,116 @@ class TestAlerts(TestCase):
       }
     }
 
-    alert = ScriptAlert(json, json['source'], None)
+    alert = ScriptAlert(definition_json, definition_json['source'], None)
     self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{0}')
 
-    json['source']['type'] = 'PORT'
-    alert = PortAlert(json, json['source'])
+    definition_json['source']['type'] = 'PORT'
+    alert = PortAlert(definition_json, definition_json['source'])
     self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), 'TCP OK - {0:.4f} response on port {1}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), 'TCP OK - {0:.4f} response on port {1}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), 'Connection failed: {0} to {1}:{2}')
 
-    json['source']['type'] = 'WEB'
-    alert = WebAlert(json, json['source'])
+    definition_json['source']['type'] = 'WEB'
+    alert = WebAlert(definition_json, definition_json['source'])
     self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), 'HTTP {0} response in {2:.4f} seconds')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), 'HTTP {0} response in {2:.4f} seconds')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), 'Connection failed to {1}')
 
-    json['source']['type'] = 'METRIC'
-    alert = MetricAlert(json, json['source'])
+    definition_json['source']['type'] = 'METRIC'
+    alert = MetricAlert(definition_json, definition_json['source'])
     self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{0}')
-    
-  @patch("json.dump")
-  def test_update_configurations(self, json_mock):
 
-    def open_side_effect(file, mode):
-      if mode == 'w':
-        file_mock = MagicMock()
-        return file_mock
-      else:
-        return self.original_open(file, mode)
 
-    test_file_path = os.path.join('ambari_agent', 'dummy_files')
-    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
-    test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
-    test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
+  def test_configuration_updates(self):
+    definition_json = {
+      "name": "namenode_process",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "enabled": True,
+      "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
+      "source": {
+        "type": "SCRIPT",
+        "path": "test_script.py",
+      }
+    }
 
-    commands = [{"clusterName": "c1",
-                 "configurations": {
-                   "hdfs-site": {
-                     "dfs.namenode.http-address": "c6401.ambari.apache.org:50071"
-                   }
-                 }}]
-    with open(os.path.join(test_stack_path, "definitions.json"),"r") as fp:
-      all_commands = json.load(fp)
-    all_commands[0]['configurations']['hdfs-site'].update({"dfs.namenode.http-address": "c6401.ambari.apache.org:50071"})
-
-    ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None)
-    ash.start()
+    # normally set by AlertSchedulerHandler
+    definition_json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files')
+    definition_json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services')
+    definition_json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts')
 
-    with patch("__builtin__.open") as open_mock:
-      open_mock.side_effect = open_side_effect
-      ash.update_configurations(commands)
+    configuration = {'foo-site' :
+      { 'bar': 'rendered-bar', 'baz' : 'rendered-baz' }
+    }
 
-    self.assertTrue(json_mock.called)
-    self.assertTrue(json_mock.called_with(all_commands))
+    # populate the configuration cache with the initial configs
+    collector = AlertCollector()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, configuration)
 
+    # run the alert and verify the output
+    alert = ScriptAlert(definition_json, definition_json['source'], MagicMock())
+    alert.set_helpers(collector, cluster_configuration )
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    alert.collect()
 
-  @patch.object(AlertSchedulerHandler,"reschedule_all")
-  @patch("json.dump")
-  def test_update_configurations_without_reschedule(self, json_mock, reschedule_mock):
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
 
-    def open_side_effect(file, mode):
-      if mode == 'w':
-        file_mock = MagicMock()
-        return file_mock
-      else:
-        return self.original_open(file, mode)
+    self.assertEquals('WARNING', alerts[0]['state'])
+    self.assertEquals('bar is rendered-bar, baz is rendered-baz', alerts[0]['text'])
 
-    test_file_path = os.path.join('ambari_agent', 'dummy_files')
-    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
-    test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
-    test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
+    # now update only the configs and run the same alert again and check
+    # for different output
+    configuration = {'foo-site' :
+      { 'bar': 'rendered-bar2', 'baz' : 'rendered-baz2' }
+    }
 
-    with open(os.path.join(test_stack_path, "definitions.json"),"r") as fp:
-      all_commands = json.load(fp)
+    # populate the configuration cache with the initial configs
+    self.__update_cluster_configuration(cluster_configuration, configuration)
 
-    # create a copy of the configurations from definitions.json, then add
-    # a brand new property - this should not cause a restart since there are
-    # no alerts that use this new property
-    commands = [{"clusterName": "c1" }]
-    commands[0]['configurations'] = all_commands[0]['configurations']
-    commands[0]['configurations'].update({ "foo" : "bar" })
+    alert.collect()
 
-    ash = AlertSchedulerHandler(test_file_path, test_stack_path,
-      test_common_services_path, test_host_scripts_path, None)
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
 
-    ash.start()
+    self.assertEquals('WARNING', alerts[0]['state'])
+    self.assertEquals('bar is rendered-bar2, baz is rendered-baz2', alerts[0]['text'])
 
+
+  def __get_cluster_configuration(self):
+    """
+    Gets an instance of the cluster cache where the file read and write
+    operations have been mocked out
+    :return:
+    """
     with patch("__builtin__.open") as open_mock:
-      open_mock.side_effect = open_side_effect
-      ash.update_configurations(commands)
+      open_mock.side_effect = self.open_side_effect
+      cluster_configuration = ClusterConfiguration("")
+      return cluster_configuration
+
+
+  def __update_cluster_configuration(self, cluster_configuration, configuration):
+    """
+    Updates the configuration cache, using as mock file as the disk based
+    cache so that a file is not created during tests
+    :return:
+    """
+    with patch("__builtin__.open") as open_mock:
+      open_mock.side_effect = self.open_side_effect
+      cluster_configuration._update_configurations("c1", configuration)
+
 
-    self.assertTrue(json_mock.called)
-    self.assertTrue(json_mock.called_with(all_commands))
-    self.assertFalse(reschedule_mock.called)
+  def open_side_effect(self, file, mode):
+    if mode == 'w':
+      file_mock = MagicMock()
+      return file_mock
+    else:
+      return self.original_open(file, mode)

+ 100 - 0
ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py

@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import sys
+
+from ambari_agent.ClusterConfiguration import ClusterConfiguration
+
+from mock.mock import MagicMock, patch, mock_open, ANY
+from unittest import TestCase
+
+class TestClusterConfigurationCache(TestCase):
+
+  def setUp(self):
+    # save original open() method for later use
+    self.original_open = open
+
+  def tearDown(self):
+    sys.stdout == sys.__stdout__
+
+
+  @patch("os.path.exists", new = MagicMock(return_value=True))
+  @patch("os.path.isfile", new = MagicMock(return_value=True))
+  def test_cluster_configuration_cache_initialization(self):
+    configuration_json = '{ "c1" : { "foo-site" : { "foo" : "bar", "foobar" : "baz" } } }'
+    open_mock = mock_open(read_data=configuration_json)
+
+    with patch("__builtin__.open", open_mock):
+      cluster_configuration = ClusterConfiguration("/foo/bar/baz")
+
+    open_mock.assert_called_with("/foo/bar/baz/configurations.json", 'r')
+
+    self.assertEqual('bar', cluster_configuration.get_configuration_value('c1', 'foo-site/foo') )
+    self.assertEqual('baz', cluster_configuration.get_configuration_value('c1', 'foo-site/foobar') )
+    self.assertEqual(None, cluster_configuration.get_configuration_value('c1', 'INVALID') )
+    self.assertEqual(None, cluster_configuration.get_configuration_value('c1', 'INVALID/INVALID') )
+    self.assertEqual(None, cluster_configuration.get_configuration_value('INVALID', 'foo-site/foo') )
+    self.assertEqual(None, cluster_configuration.get_configuration_value('INVALID', 'foo-site/foobar') )
+
+
+  @patch("json.dump")
+  def test_cluster_configuration_update(self, json_dump_mock):
+    cluster_configuration = self.__get_cluster_configuration()
+
+    configuration = {'foo-site' :
+      { 'bar': 'rendered-bar', 'baz' : 'rendered-baz' }
+    }
+
+    file_mock = self.__update_cluster_configuration(cluster_configuration, configuration)
+    file_mock.assert_called_with('/foo/bar/baz/configurations.json', 'w')
+
+    json_dump_mock.assert_called_with({'c1': {'foo-site': {'baz': 'rendered-baz', 'bar': 'rendered-bar'}}}, ANY, indent=2)
+
+  def __get_cluster_configuration(self):
+    """
+    Gets an instance of the cluster cache where the file read and write
+    operations have been mocked out
+    :return:
+    """
+    with patch("__builtin__.open") as open_mock:
+      open_mock.side_effect = self.open_side_effect
+      cluster_configuration = ClusterConfiguration("/foo/bar/baz")
+      return cluster_configuration
+
+
+  def __update_cluster_configuration(self, cluster_configuration, configuration):
+    """
+    Updates the configuration cache, using as mock file as the disk based
+    cache so that a file is not created during tests
+    :return:
+    """
+    with patch("__builtin__.open") as open_mock:
+      open_mock.side_effect = self.open_side_effect
+      cluster_configuration._update_configurations("c1", configuration)
+
+    return open_mock
+
+
+  def open_side_effect(self, file, mode):
+    if mode == 'w':
+      file_mock = MagicMock()
+      return file_mock
+    else:
+      return self.original_open(file, mode)

+ 8 - 0
ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/alerts/alert_nodemanagers_summary.py

@@ -28,6 +28,14 @@ NODEMANAGER_HTTP_ADDRESS_KEY = '{{yarn-site/yarn.resourcemanager.webapp.address}
 NODEMANAGER_HTTPS_ADDRESS_KEY = '{{yarn-site/yarn.resourcemanager.webapp.https.address}}'
 YARN_HTTP_POLICY_KEY = '{{yarn-site/yarn.http.policy}}'
   
+def get_tokens():
+  """
+  Returns a tuple of tokens in the format {{site/property}} that will be used
+  to build the dictionary passed into execute
+  """
+  return NODEMANAGER_HTTP_ADDRESS_KEY, NODEMANAGER_HTTPS_ADDRESS_KEY, \
+    YARN_HTTP_POLICY_KEY
+
 
 def execute(parameters=None, host_name=None):
   """