Ver código fonte

AMBARI-9615 - Agents Do Not Update Cached Alert JSON On New Configurations (jonathanhurley)

Jonathan Hurley 10 anos atrás
pai
commit
12ed8b5538

+ 62 - 35
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -63,8 +63,7 @@ class AlertSchedulerHandler():
       try:
         os.makedirs(cachedir)
       except:
-        logger.critical("Could not create the cache directory {0}".format(cachedir))
-        pass
+        logger.critical("[AlertScheduler] Could not create the cache directory {0}".format(cachedir))
 
     self._collector = AlertCollector()
     self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
@@ -82,33 +81,53 @@ class AlertSchedulerHandler():
     if reschedule_jobs:
       self.reschedule()
 
-  def __update_definition_configs(self):
-    """ updates the persisted configs and restarts the scheduler """
 
-    definitions = []
+  def __update_definition_configs(self, newConfigurations, reschedule_jobs=False):
+    """
+    Updates the definitions and configurations stored on disk. Optionally
+    can reschedule jobs. Job rescheduling is only necessary when data that
+    an existing job uses has changed. In many cases, configuration values
+    have changed, yet no jobs need rescheduling.
+
+    :param reschedule_jobs:
+    :return:
+    """
+
+    if reschedule_jobs:
+      logger.info("[AlertScheduler] Updating {0} with the latest configuration values and rescheduling alert jobs".format(self.FILENAME))
+    else:
+      logger.info("[AlertScheduler] Updating {0} with the latest configuration values".format(self.FILENAME))
 
-    all_commands = None
     # Load definitions from json
     try:
       with open(os.path.join(self.cachedir, self.FILENAME), 'r') as fp:
         all_commands = json.load(fp)
     except IOError, ValueError:
-      if (logger.isEnabledFor(logging.DEBUG)):
-        logger.exception("Failed to load definitions. {0}".format(traceback.format_exc()))
+      if logger.isEnabledFor(logging.DEBUG):
+        logger.exception("[AlertScheduler] Failed to load definitions. {0}".format(traceback.format_exc()))
       return
 
     # Update definitions with current config
     for command_json in all_commands:
-      clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
-      hostName = '' if not 'hostName' in command_json else command_json['hostName']
+      if 'clusterName' in command_json:
+        clusterName = command_json['clusterName']
+      else:
+        clusterName = ''
 
-      self.__update_config_values(command_json['configurations'],self.__config_maps[clusterName])
+      self.__update_config_values(command_json['configurations'],
+        self.__config_maps[clusterName])
+
+      # update the configurations before writing the file back out
+      command_json['configurations'] = newConfigurations
 
     # Save definitions to file
     with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
       json.dump(all_commands, f, indent=2)
 
-    self.reschedule_all()
+    # only reschdule jobs if instructed to
+    if reschedule_jobs:
+      self.reschedule_all()
+
 
   def __make_function(self, alert_def):
     return lambda: alert_def.collect()
@@ -130,7 +149,7 @@ class AlertSchedulerHandler():
     for _callable in alert_callables:
       self.schedule_definition(_callable)
       
-    logger.debug("Starting scheduler {0}; currently running: {1}".format(
+    logger.debug("[AlertScheduler] Starting {0}; currently running: {1}".format(
       str(self.__scheduler), str(self.__scheduler.running)))
 
     self.__scheduler.start()
@@ -166,7 +185,7 @@ class AlertSchedulerHandler():
       # jobs without valid UUIDs should be unscheduled
       if uuid_valid == False:
         jobs_removed += 1
-        logger.info("Unscheduling {0}".format(scheduled_job.name))
+        logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
         self._collector.remove_by_uuid(scheduled_job.name)
         self.__scheduler.unschedule_job(scheduled_job)
       
@@ -184,7 +203,7 @@ class AlertSchedulerHandler():
         jobs_scheduled += 1
         self.schedule_definition(definition)
   
-    logger.info("Alert Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
+    logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
         str(jobs_scheduled), str(jobs_removed)))
 
   def reschedule_all(self):
@@ -200,9 +219,8 @@ class AlertSchedulerHandler():
 
     # unschedule all scheduled jobs
     for scheduled_job in scheduled_jobs:
-
         jobs_removed += 1
-        logger.info("Unscheduling {0}".format(scheduled_job.name))
+        logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
         self._collector.remove_by_uuid(scheduled_job.name)
         self.__scheduler.unschedule_job(scheduled_job)
 
@@ -211,7 +229,7 @@ class AlertSchedulerHandler():
         jobs_scheduled += 1
         self.schedule_definition(definition)
 
-    logger.info("Alert Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
+    logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
       str(jobs_scheduled), str(jobs_removed)))
 
 
@@ -230,7 +248,7 @@ class AlertSchedulerHandler():
       with open(alerts_definitions_path) as fp:
         all_commands = json.load(fp)
     except:
-      logger.warning('Alert definitions file was not found under "{0}". No alerts will be scheduled.'.format(alerts_definitions_path))
+      logger.warning('[AlertScheduler] {0} not found. No alerts will be scheduled.'.format(alerts_definitions_path))
       return definitions
     
     for command_json in all_commands:
@@ -270,7 +288,7 @@ class AlertSchedulerHandler():
     source_type = source.get('type', '')
 
     if logger.isEnabledFor(logging.DEBUG):
-      logger.debug("Creating job type {0} with {1}".format(source_type, str(json_definition)))
+      logger.debug("[AlertScheduler] Creating job type {0} with {1}".format(source_type, str(json_definition)))
     
     alert = None
 
@@ -328,22 +346,31 @@ class AlertSchedulerHandler():
 
   def update_configurations(self, commands):
     """
-    when an execution command comes in, update any necessary values.
-    status commands do not contain useful configurations
+    Checks the execution command's configurations against those stored in
+    memory. If there are differences, this will reschedule alerts. The
+    on-disk JSON file is always updated so that it reflects the correct state
+    of configurations
     """
     for command in commands:
       clusterName = command['clusterName']
       if not clusterName in self.__config_maps:
         continue
-        
-      if 'configurations' in command:
-        configmap = command['configurations']
-        keylist = self.__config_maps[clusterName].keys()
-        vals = self.__find_config_values(configmap, keylist)
-        # if we have updated values push them to config_maps and reschedule
-        if vals != self.__config_maps[clusterName]:
-          self.__config_maps[clusterName].update(vals)
-          self.__update_definition_configs()
+
+      if not 'configurations' in command:
+        continue
+
+      existingConfigurationKeys = self.__config_maps[clusterName].keys()
+      newConfigurations = command['configurations']
+      newConfigurationValues = self.__find_config_values(newConfigurations,
+        existingConfigurationKeys)
+
+      # if we have updated values push them to config_maps and reschedule
+      rescheduleJobs = False
+      if newConfigurationValues != self.__config_maps[clusterName]:
+        rescheduleJobs = True
+        self.__config_maps[clusterName].update(newConfigurationValues)
+
+      self.__update_definition_configs(newConfigurations, rescheduleJobs)
         
 
   def schedule_definition(self,definition):
@@ -356,7 +383,7 @@ class AlertSchedulerHandler():
     """
     # NOOP if the definition is disabled; don't schedule it
     if definition.is_enabled() == False:
-      logger.info("The alert {0} with UUID {1} is disabled and will not be scheduled".format(
+      logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format(
           definition.get_name(),definition.get_uuid()))
       return
     
@@ -374,7 +401,7 @@ class AlertSchedulerHandler():
     if job is not None:
       job.name = definition.get_uuid()
       
-    logger.info("Scheduling {0} with UUID {1}".format(
+    logger.info("[AlertScheduler] Scheduling {0} with UUID {1}".format(
       definition.get_name(), definition.get_uuid()))
   
 
@@ -410,13 +437,13 @@ class AlertSchedulerHandler():
         if alert is None:
           continue
   
-        logger.info("Executing on-demand alert {0} ({1})".format(alert.get_name(), 
+        logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(),
             alert.get_uuid()))
         
         alert.set_helpers(self._collector, self.__config_maps[clusterName])
         alert.collect()
       except:
-        logger.exception("Unable to execute the alert outside of the job scheduler")
+        logger.exception("[AlertScheduler] Unable to execute the alert outside of the job scheduler")
 
 def main():
   args = list(sys.argv)

+ 10 - 14
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -258,33 +258,29 @@ class Controller(threading.Thread):
         else:
           self.responseId = serverId
 
-        if 'cancelCommands' in response.keys():
+        response_keys = response.keys()
+        if 'cancelCommands' in response_keys:
           self.cancelCommandInQueue(response['cancelCommands'])
-          pass
 
-        if 'executionCommands' in response.keys():
-          self.addToQueue(response['executionCommands'])
-          self.alert_scheduler_handler.update_configurations(response['executionCommands'])
-          pass
+        if 'executionCommands' in response_keys:
+          execution_commands = response['executionCommands']
+          self.addToQueue(execution_commands)
+          self.alert_scheduler_handler.update_configurations(execution_commands)
 
-        if 'statusCommands' in response.keys():
+        if 'statusCommands' in response_keys:
           self.addToStatusQueue(response['statusCommands'])
-          pass
 
-        if 'alertDefinitionCommands' in response.keys():
+        if 'alertDefinitionCommands' in response_keys:
           self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
-          pass
-        
-        if 'alertExecutionCommands' in response.keys():
+
+        if 'alertExecutionCommands' in response_keys:
           self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
-          pass
 
         if "true" == response['restartAgent']:
           logger.error("Received the restartAgent command")
           self.restartAgent()
         else:
           logger.info("No commands sent from %s", self.serverHostname)
-          pass
 
         if retry:
           logger.info("Reconnected to %s", self.heartbeatUrl)

+ 6 - 6
ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py

@@ -97,7 +97,7 @@ class BaseAlert(object):
       # this is useful for cases where the alert might run on multiple hosts
       # but only 1 host should report the data
       if result_state == BaseAlert.RESULT_SKIPPED:
-        logger.debug('Alert {0} with UUID {1} was skipped.'.format(self.get_name(),
+        logger.debug('[Alert][{0}] Skipping UUID {1}.'.format(self.get_name(),
           self.get_uuid()))
 
         return
@@ -113,7 +113,7 @@ class BaseAlert(object):
         res_base_text = self._get_reporting_text(result_state)
 
     except Exception as e:
-      message = "Unable to run alert {0}".format(str(self.alert_meta['name']))
+      message = "[Alert][{0}] Unable to run the alert".format(self.get_name())
       
       # print the exception if in DEBUG, otherwise just log the warning
       if logger.isEnabledFor(logging.DEBUG):
@@ -126,7 +126,7 @@ class BaseAlert(object):
     
     
     if logger.isEnabledFor(logging.DEBUG):
-      logger.debug("debug alert result: {0}".format(str(res)))
+      logger.debug("[Alert][{0}] result = {1}".format(self.get_name(), str(res)))
       
     data = {}
     data['name'] = self._find_value('name')
@@ -141,7 +141,7 @@ class BaseAlert(object):
     data['enabled'] = self._find_value('enabled')
 
     if logger.isEnabledFor(logging.DEBUG):
-      logger.debug("debug alert text: {0}".format(data['text']))
+      logger.debug("[Alert][{0}] text = {1}".format(self.get_name(), data['text']))
     
     self.collector.put(self.cluster, data)
 
@@ -168,8 +168,8 @@ class BaseAlert(object):
     
     if len(keys) > 0:
       if logger.isEnabledFor(logging.DEBUG):
-        logger.debug("Found parameterized key {0} for {1}".format(
-          str(keys), str(self)))
+        logger.debug("[Alert][{0}] Found parameterized key {1} for {2}".format(
+          self.get_name(), str(keys), str(self)))
 
       self._lookup_keys.append(keys[0])
       return keys[0]

+ 4 - 3
ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py

@@ -55,8 +55,8 @@ class MetricAlert(BaseAlert):
     # use the URI lookup keys to get a final URI value to query
     alert_uri = self._get_uri_from_structure(self.uri_property_keys)      
     
-    logger.debug("Calculated metric URI to be {0} (ssl={1})".format(alert_uri.uri, 
-        str(alert_uri.is_ssl_enabled)))
+    logger.debug("[Alert][{0}] Calculated metric URI to be {1} (ssl={2})".format(
+        self.get_name(), alert_uri.uri, str(alert_uri.is_ssl_enabled)))
 
     host = BaseAlert.get_host_from_url(alert_uri.uri)
     if host is None:
@@ -79,7 +79,8 @@ class MetricAlert(BaseAlert):
       
       collect_result = self.__get_result(value_list[0] if check_value is None else check_value)
 
-    logger.debug("Resolved value list is: {0}".format(str(value_list)))
+    logger.debug("[Alert][{0}] Resolved values = {1}".format(
+      self.get_name(), str(value_list)))
     
     return (collect_result, value_list)
 

+ 17 - 8
ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py

@@ -65,16 +65,16 @@ class PortAlert(BaseAlert):
 
     # check warning threshold for sanity
     if self.warning_timeout >= 30:
-      logger.warn("The alert warning threshold of {0}s is too large, resetting to {1}s".format(
-        str(self.warning_timeout), str(DEFAULT_WARNING_TIMEOUT)))
+      logger.warn("[Alert][{0}] The warning threshold of {1}s is too large, resetting to {2}s".format(
+        self.get_name(), str(self.warning_timeout), str(DEFAULT_WARNING_TIMEOUT)))
 
       self.warning_timeout = DEFAULT_WARNING_TIMEOUT
 
 
     # check critical threshold for sanity
     if self.critical_timeout >= 30:
-      logger.warn("The alert critical threshold of {0}s is too large, resetting to {1}s".format(
-        str(self.critical_timeout), str(DEFAULT_CRITICAL_TIMEOUT)))
+      logger.warn("[Alert][{0}] The critical threshold of {1}s is too large, resetting to {2}s".format(
+        self.get_name(), str(self.critical_timeout), str(DEFAULT_CRITICAL_TIMEOUT)))
 
       self.critical_timeout = DEFAULT_CRITICAL_TIMEOUT
 
@@ -84,6 +84,8 @@ class PortAlert(BaseAlert):
     uri_value = self._lookup_property_value(self.uri)
     if uri_value is None:
       uri_value = self.host_name
+      logger.debug("[Alert][{0}] Setting the URI to this host since it wasn't specified".format(
+        self.get_name()))
 
     # in some cases, a single property is a comma-separated list like
     # host1:8080,host2:8081,host3:8083
@@ -92,6 +94,11 @@ class PortAlert(BaseAlert):
       for item in uri_value_array:
         if self.host_name in item:
           uri_value = item
+          if logger.isEnabledFor(logging.DEBUG):
+            logger.debug("[Alert][{0}] Extracted {1} as the host name while parsing the CSV URI {2}".format(
+              self.get_name(), uri_value, str(uri_value_array)))
+          break
+
 
     host = BaseAlert.get_host_from_url(uri_value)
     if host is None:
@@ -108,19 +115,21 @@ class PortAlert(BaseAlert):
 
 
     if logger.isEnabledFor(logging.DEBUG):
-      logger.debug("checking {0} listening on port {1}".format(host, str(port)))
+      logger.debug("[Alert][{0}] Checking {1} on port {2}".format(
+        self.get_name(), host, str(port)))
     
     try:
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       s.settimeout(self.critical_timeout)
 
-      t = time.time()
       if OSCheck.is_windows_family():
         # on windows 0.0.0.0 is invalid address to connect but on linux it resolved to 127.0.0.1
         host = resolve_address(host)
+
+      start_time = time.time()
       s.connect((host, port))
-      milliseconds = time.time() - t
-      seconds = milliseconds/1000.0
+      milliseconds = time.time() - start_time
+      seconds = milliseconds / 1000.0
 
       # not sure why this happens sometimes, but we don't always get a
       # socket exception if the connect() is > than the critical threshold

+ 8 - 5
ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py

@@ -72,8 +72,8 @@ class ScriptAlert(BaseAlert):
           # append the key to the list of keys for this alert
           self._find_lookup_property(token)
     except:
-      logger.exception("Unable to parameterize tokens for script {0}".format(self.path))
-      pass
+      logger.exception("[Alert][{0}] Unable to parameterize tokens for script {1}".format(
+        self.get_name(), self.path))
               
     
   def _collect(self):
@@ -91,7 +91,7 @@ class ScriptAlert(BaseAlert):
       matchObj = re.match( r'((.*)services\/(.*)\/package\/)', self.path_to_script)
       if matchObj:
         basedir = matchObj.group(1)
-        with Environment(basedir, tmp_dir=self.config.get.get('agent', 'tmp_dir')) as env:
+        with Environment(basedir, tmp_dir=self.config.get('agent', 'tmp_dir')) as env:
           return cmd_module.execute(parameters, self.host_name)
       else:
         return cmd_module.execute(parameters, self.host_name)
@@ -125,11 +125,14 @@ class ScriptAlert(BaseAlert):
           self.stacks_dir, self.host_scripts_dir))
 
     if logger.isEnabledFor(logging.DEBUG):
-      logger.debug("Executing script check {0}".format(self.path_to_script))
+      logger.debug("[Alert][{0}] Executing script check {1}".format(
+        self.get_name(), self.path_to_script))
 
           
     if (not self.path_to_script.endswith('.py')):
-      logger.error("Unable to execute script {0}".format(self.path_to_script))
+      logger.error("[Alert][{0}] Unable to execute script {1}".format(
+        self.get_name(), self.path_to_script))
+
       return None
 
     return imp.load_source(self._find_value('name'), self.path_to_script)

+ 3 - 3
ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py

@@ -48,8 +48,8 @@ class WebAlert(BaseAlert):
     # use the URI lookup keys to get a final URI value to query
     alert_uri = self._get_uri_from_structure(self.uri_property_keys)      
 
-    logger.debug("Calculated web URI to be {0} (ssl={1})".format(alert_uri.uri, 
-        str(alert_uri.is_ssl_enabled)))
+    logger.debug("[Alert][{0}] Calculated web URI to be {1} (ssl={2})".format(
+      self.get_name(), alert_uri.uri, str(alert_uri.is_ssl_enabled)))
 
     url = self._build_web_query(alert_uri)
     web_response = self._make_web_request(url)
@@ -116,7 +116,7 @@ class WebAlert(BaseAlert):
       time_millis = time.time() - start_time
     except:
       if logger.isEnabledFor(logging.DEBUG):
-        logger.exception("Unable to make a web request.")
+        logger.exception("[Alert][{0}] Failed to make a web request".format(self.get_name()))
       
       return WebResponse(status_code=0, time_millis=0)
     

+ 40 - 0
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -744,6 +744,46 @@ class TestAlerts(TestCase):
     with patch("__builtin__.open") as open_mock:
       open_mock.side_effect = open_side_effect
       ash.update_configurations(commands)
+
     self.assertTrue(json_mock.called)
     self.assertTrue(json_mock.called_with(all_commands))
 
+
+  @patch.object(AlertSchedulerHandler,"reschedule_all")
+  @patch("json.dump")
+  def test_update_configurations_without_reschedule(self, json_mock, reschedule_mock):
+
+    def open_side_effect(file, mode):
+      if mode == 'w':
+        file_mock = MagicMock()
+        return file_mock
+      else:
+        return self.original_open(file, mode)
+
+    test_file_path = os.path.join('ambari_agent', 'dummy_files')
+    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
+    test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
+    test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
+
+    with open(os.path.join(test_stack_path, "definitions.json"),"r") as fp:
+      all_commands = json.load(fp)
+
+    # create a copy of the configurations from definitions.json, then add
+    # a brand new property - this should not cause a restart since there are
+    # no alerts that use this new property
+    commands = [{"clusterName": "c1" }]
+    commands[0]['configurations'] = all_commands[0]['configurations']
+    commands[0]['configurations'].update({ "foo" : "bar" })
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path,
+      test_common_services_path, test_host_scripts_path, None)
+
+    ash.start()
+
+    with patch("__builtin__.open") as open_mock:
+      open_mock.side_effect = open_side_effect
+      ash.update_configurations(commands)
+
+    self.assertTrue(json_mock.called)
+    self.assertTrue(json_mock.called_with(all_commands))
+    self.assertFalse(reschedule_mock.called)

+ 1 - 1
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json

@@ -81,7 +81,7 @@
       },
       {
         "name": "ams_metrics_collector_hbase_master_cpu",
-        "label": "Metrics Collector HBase Maser CPU Utilization",
+        "label": "Metrics Collector - HBase Master CPU Utilization",
         "description": "This host-level alert is triggered if CPU utilization of the Metrics Collector's HBase Master exceeds certain warning and critical thresholds. It checks the HBase Master JMX Servlet for the SystemCPULoad property. The threshold values are in percent.",
         "interval": 5,
         "scope": "ANY",