Bläddra i källkod

AMBARI-8885 - Agent requires restart for alerts to work (Andrew Onischuk via jonathanhurley)

Jonathan Hurley 10 år sedan
förälder
incheckning
be2adc4d2a

+ 71 - 3
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -81,6 +81,33 @@ class AlertSchedulerHandler():
     if reschedule_jobs:
       self.reschedule()
 
+  def __update_definition_configs(self):
+    """ updates the persisted configs and restarts the scheduler """
+
+    definitions = []
+
+    all_commands = None
+    # Load definitions from json
+    try:
+      with open(os.path.join(self.cachedir, self.FILENAME), 'r') as fp:
+        all_commands = json.load(fp)
+    except IOError, ValueError:
+      if (logger.isEnabledFor(logging.DEBUG)):
+        logger.exception("Failed to load definitions. {0}".format(traceback.format_exc()))
+      return
+
+    # Update definitions with current config
+    for command_json in all_commands:
+      clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
+      hostName = '' if not 'hostName' in command_json else command_json['hostName']
+
+      self.__update_config_values(command_json['configurations'],self.__config_maps[clusterName])
+
+    # Save definitions to file
+    with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
+      json.dump(all_commands, f, indent=2)
+
+    self.reschedule_all()
 
   def __make_function(self, alert_def):
     return lambda: alert_def.collect()
@@ -134,7 +161,7 @@ class AlertSchedulerHandler():
         if scheduled_job.name == definition_uuid:
           uuid_valid = True
           break
-      
+
       # jobs without valid UUIDs should be unscheduled
       if uuid_valid == False:
         jobs_removed += 1
@@ -159,6 +186,33 @@ class AlertSchedulerHandler():
     logger.info("Alert Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
         str(jobs_scheduled), str(jobs_removed)))
 
+  def reschedule_all(self):
+    """
+    Removes jobs that are scheduled where their UUID no longer is valid.
+    Schedules jobs where the definition UUID is not currently scheduled.
+    """
+    jobs_scheduled = 0
+    jobs_removed = 0
+
+    definitions = self.__load_definitions()
+    scheduled_jobs = self.__scheduler.get_jobs()
+
+    # unschedule all scheduled jobs
+    for scheduled_job in scheduled_jobs:
+
+        jobs_removed += 1
+        logger.info("Unscheduling {0}".format(scheduled_job.name))
+        self._collector.remove_by_uuid(scheduled_job.name)
+        self.__scheduler.unschedule_job(scheduled_job)
+
+    # for every definition, schedule a job
+    for definition in definitions:
+        jobs_scheduled += 1
+        self.schedule_definition(definition)
+
+    logger.info("Alert Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
+      str(jobs_scheduled), str(jobs_removed)))
+
 
   def collector(self):
     """ gets the collector for reporting to the server """
@@ -259,7 +313,18 @@ class AlertSchedulerHandler():
         
     return result
 
- 
+  def __update_config_values(self, configs, actual_configs):
+    for slashkey in actual_configs.keys():
+      dicts = slashkey.split('/')
+      current_dict = configs
+      for i in range(len(dicts)):
+        if i+1 >= len(dicts):
+          current_dict[dicts[i]] = actual_configs[slashkey]
+        else:
+          if not dicts[i] in current_dict:
+            current_dict[dicts[i]]={}
+          current_dict = current_dict[dicts[i]]
+
   def update_configurations(self, commands):
     """
     when an execution command comes in, update any necessary values.
@@ -274,7 +339,10 @@ class AlertSchedulerHandler():
         configmap = command['configurations']
         keylist = self.__config_maps[clusterName].keys()
         vals = self.__find_config_values(configmap, keylist)
-        self.__config_maps[clusterName].update(vals)
+        # if we have updated values push them to config_maps and reschedule
+        if vals != self.__config_maps[clusterName]:
+          self.__config_maps[clusterName].update(vals)
+          self.__update_definition_configs()
         
 
   def schedule_definition(self,definition):

+ 38 - 1
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -22,6 +22,7 @@ import os
 import socket
 import sys
 import re
+import json
 
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.alerts.collector import AlertCollector
@@ -38,7 +39,8 @@ from unittest import TestCase
 class TestAlerts(TestCase):
 
   def setUp(self):
-    pass
+    # save original open() method for later use
+    self.original_open = open
 
 
   def tearDown(self):
@@ -710,3 +712,38 @@ class TestAlerts(TestCase):
     self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{0}')
+    
+  @patch("json.dump")
+  def test_update_configurations(self, json_mock):
+
+    def open_side_effect(file, mode):
+      if mode == 'w':
+        file_mock = MagicMock()
+        return file_mock
+      else:
+        return self.original_open(file, mode)
+
+    test_file_path = os.path.join('ambari_agent', 'dummy_files')
+    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
+    test_common_services_path = os.path.join('ambari_agent', 'dummy_files')
+    test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
+
+    commands = [{"clusterName": "c1",
+                 "configurations": {
+                   "hdfs-site": {
+                     "dfs.namenode.http-address": "c6401.ambari.apache.org:50071"
+                   }
+                 }}]
+    with open(os.path.join(test_stack_path, "definitions.json"),"r") as fp:
+      all_commands = json.load(fp)
+    all_commands[0]['configurations']['hdfs-site'].update({"dfs.namenode.http-address": "c6401.ambari.apache.org:50071"})
+
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path)
+    ash.start()
+
+    with patch("__builtin__.open") as open_mock:
+      open_mock.side_effect = open_side_effect
+      ash.update_configurations(commands)
+    self.assertTrue(json_mock.called)
+    self.assertTrue(json_mock.called_with(all_commands))
+