AlertSchedulerHandler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #!/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. """
  17. """
  18. http://apscheduler.readthedocs.org/en/v2.1.2
  19. """
  20. import json
  21. import logging
  22. import os
  23. import sys
  24. import time
  25. import atexit
  26. from apscheduler.scheduler import Scheduler
  27. from alerts.collector import AlertCollector
  28. from alerts.metric_alert import MetricAlert
  29. from alerts.port_alert import PortAlert
  30. from alerts.script_alert import ScriptAlert
  31. from alerts.web_alert import WebAlert
  32. logger = logging.getLogger()
  33. class AlertSchedulerHandler():
  34. FILENAME = 'definitions.json'
  35. TYPE_PORT = 'PORT'
  36. TYPE_METRIC = 'METRIC'
  37. TYPE_SCRIPT = 'SCRIPT'
  38. TYPE_WEB = 'WEB'
  39. APS_CONFIG = {
  40. 'threadpool.core_threads': 3,
  41. 'coalesce': True,
  42. 'standalone': False
  43. }
  44. def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir,
  45. cluster_configuration, config, in_minutes=True):
  46. self.cachedir = cachedir
  47. self.stacks_dir = stacks_dir
  48. self.common_services_dir = common_services_dir
  49. self.host_scripts_dir = host_scripts_dir
  50. self._cluster_configuration = cluster_configuration
  51. if not os.path.exists(cachedir):
  52. try:
  53. os.makedirs(cachedir)
  54. except:
  55. logger.critical("[AlertScheduler] Could not create the cache directory {0}".format(cachedir))
  56. self._collector = AlertCollector()
  57. self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
  58. self.__in_minutes = in_minutes
  59. self.config = config
  60. # register python exit handler
  61. atexit.register(self.exit_handler)
  62. def exit_handler(self):
  63. """
  64. Exit handler
  65. """
  66. self.stop()
  67. def update_definitions(self, heartbeat):
  68. """
  69. Updates the persisted alert definitions JSON.
  70. :param heartbeat:
  71. :return:
  72. """
  73. if 'alertDefinitionCommands' not in heartbeat:
  74. logger.warning("There are no alert definition commands in the heartbeat; unable to update definitions")
  75. return
  76. # prune out things we don't want to store
  77. alert_definitions = []
  78. for command in heartbeat['alertDefinitionCommands']:
  79. command_copy = command.copy()
  80. # no need to store these since we always use the in-memory cached values
  81. if 'configurations' in command_copy:
  82. del command_copy['configurations']
  83. alert_definitions.append(command_copy)
  84. # write out the new definitions
  85. with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
  86. json.dump(alert_definitions, f, indent=2)
  87. # reschedule only the jobs that have changed
  88. self.reschedule()
  89. def __make_function(self, alert_def):
  90. return lambda: alert_def.collect()
  91. def start(self):
  92. """ loads definitions from file and starts the scheduler """
  93. if self.__scheduler is None:
  94. return
  95. if self.__scheduler.running:
  96. self.__scheduler.shutdown(wait=False)
  97. self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
  98. alert_callables = self.__load_definitions()
  99. # schedule each definition
  100. for _callable in alert_callables:
  101. self.schedule_definition(_callable)
  102. logger.info("[AlertScheduler] Starting {0}; currently running: {1}".format(
  103. str(self.__scheduler), str(self.__scheduler.running)))
  104. self.__scheduler.start()
  105. def stop(self):
  106. if not self.__scheduler is None:
  107. self.__scheduler.shutdown(wait=False)
  108. self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG)
  109. logger.info("[AlertScheduler] Stopped the alert scheduler.")
  110. def reschedule(self):
  111. """
  112. Removes jobs that are scheduled where their UUID no longer is valid.
  113. Schedules jobs where the definition UUID is not currently scheduled.
  114. """
  115. jobs_scheduled = 0
  116. jobs_removed = 0
  117. definitions = self.__load_definitions()
  118. scheduled_jobs = self.__scheduler.get_jobs()
  119. # for every scheduled job, see if its UUID is still valid
  120. for scheduled_job in scheduled_jobs:
  121. uuid_valid = False
  122. for definition in definitions:
  123. definition_uuid = definition.get_uuid()
  124. if scheduled_job.name == definition_uuid:
  125. uuid_valid = True
  126. break
  127. # jobs without valid UUIDs should be unscheduled
  128. if uuid_valid == False:
  129. jobs_removed += 1
  130. logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
  131. self._collector.remove_by_uuid(scheduled_job.name)
  132. self.__scheduler.unschedule_job(scheduled_job)
  133. # for every definition, determine if there is a scheduled job
  134. for definition in definitions:
  135. definition_scheduled = False
  136. for scheduled_job in scheduled_jobs:
  137. definition_uuid = definition.get_uuid()
  138. if definition_uuid == scheduled_job.name:
  139. definition_scheduled = True
  140. break
  141. # if no jobs are found with the definitions UUID, schedule it
  142. if definition_scheduled == False:
  143. jobs_scheduled += 1
  144. self.schedule_definition(definition)
  145. logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
  146. str(jobs_scheduled), str(jobs_removed)))
  147. def reschedule_all(self):
  148. """
  149. Removes jobs that are scheduled where their UUID no longer is valid.
  150. Schedules jobs where the definition UUID is not currently scheduled.
  151. """
  152. jobs_scheduled = 0
  153. jobs_removed = 0
  154. definitions = self.__load_definitions()
  155. scheduled_jobs = self.__scheduler.get_jobs()
  156. # unschedule all scheduled jobs
  157. for scheduled_job in scheduled_jobs:
  158. jobs_removed += 1
  159. logger.info("[AlertScheduler] Unscheduling {0}".format(scheduled_job.name))
  160. self._collector.remove_by_uuid(scheduled_job.name)
  161. self.__scheduler.unschedule_job(scheduled_job)
  162. # for every definition, schedule a job
  163. for definition in definitions:
  164. jobs_scheduled += 1
  165. self.schedule_definition(definition)
  166. logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format(
  167. str(jobs_scheduled), str(jobs_removed)))
  168. def collector(self):
  169. """ gets the collector for reporting to the server """
  170. return self._collector
  171. def __load_definitions(self):
  172. """
  173. Loads all alert definitions from a file. All clusters are stored in
  174. a single file.
  175. :return:
  176. """
  177. definitions = []
  178. all_commands = None
  179. alerts_definitions_path = os.path.join(self.cachedir, self.FILENAME)
  180. try:
  181. with open(alerts_definitions_path) as fp:
  182. all_commands = json.load(fp)
  183. except:
  184. logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path))
  185. return definitions
  186. for command_json in all_commands:
  187. clusterName = '' if not 'clusterName' in command_json else command_json['clusterName']
  188. hostName = '' if not 'hostName' in command_json else command_json['hostName']
  189. for definition in command_json['alertDefinitions']:
  190. alert = self.__json_to_callable(clusterName, hostName, definition)
  191. if alert is None:
  192. continue
  193. alert.set_helpers(self._collector, self._cluster_configuration)
  194. definitions.append(alert)
  195. return definitions
  196. def __json_to_callable(self, clusterName, hostName, json_definition):
  197. """
  198. converts the json that represents all aspects of a definition
  199. and makes an object that extends BaseAlert that is used for individual
  200. """
  201. source = json_definition['source']
  202. source_type = source.get('type', '')
  203. if logger.isEnabledFor(logging.DEBUG):
  204. logger.debug("[AlertScheduler] Creating job type {0} with {1}".format(source_type, str(json_definition)))
  205. alert = None
  206. if source_type == AlertSchedulerHandler.TYPE_METRIC:
  207. alert = MetricAlert(json_definition, source)
  208. elif source_type == AlertSchedulerHandler.TYPE_PORT:
  209. alert = PortAlert(json_definition, source)
  210. elif source_type == AlertSchedulerHandler.TYPE_SCRIPT:
  211. source['stacks_directory'] = self.stacks_dir
  212. source['common_services_directory'] = self.common_services_dir
  213. source['host_scripts_directory'] = self.host_scripts_dir
  214. alert = ScriptAlert(json_definition, source, self.config)
  215. elif source_type == AlertSchedulerHandler.TYPE_WEB:
  216. alert = WebAlert(json_definition, source, self.config)
  217. if alert is not None:
  218. alert.set_cluster(clusterName, hostName)
  219. return alert
  220. def schedule_definition(self,definition):
  221. """
  222. Schedule a definition (callable). Scheduled jobs are given the UUID
  223. as their name so that they can be identified later on.
  224. <p/>
  225. This function can be called with a definition that is disabled; it will
  226. simply NOOP.
  227. """
  228. # NOOP if the definition is disabled; don't schedule it
  229. if not definition.is_enabled():
  230. logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format(
  231. definition.get_name(),definition.get_uuid()))
  232. return
  233. job = None
  234. if self.__in_minutes:
  235. job = self.__scheduler.add_interval_job(self.__make_function(definition),
  236. minutes=definition.interval())
  237. else:
  238. job = self.__scheduler.add_interval_job(self.__make_function(definition),
  239. seconds=definition.interval())
  240. # although the documentation states that Job(kwargs) takes a name
  241. # key/value pair, it does not actually set the name; do it manually
  242. if job is not None:
  243. job.name = definition.get_uuid()
  244. logger.info("[AlertScheduler] Scheduling {0} with UUID {1}".format(
  245. definition.get_name(), definition.get_uuid()))
  246. def get_job_count(self):
  247. """
  248. Gets the number of jobs currently scheduled. This is mainly used for
  249. test verification of scheduling.
  250. """
  251. if self.__scheduler is None:
  252. return 0
  253. return len(self.__scheduler.get_jobs())
  254. def execute_alert(self, execution_commands):
  255. """
  256. Executes an alert immediately, ignoring any scheduled jobs. The existing
  257. jobs remain untouched. The result of this is stored in the alert
  258. collector for tranmission during the next heartbeat
  259. """
  260. if self.__scheduler is None or execution_commands is None:
  261. return
  262. for execution_command in execution_commands:
  263. try:
  264. alert_definition = execution_command['alertDefinition']
  265. clusterName = '' if not 'clusterName' in execution_command else execution_command['clusterName']
  266. hostName = '' if not 'hostName' in execution_command else execution_command['hostName']
  267. alert = self.__json_to_callable(clusterName, hostName, alert_definition)
  268. if alert is None:
  269. continue
  270. logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(),
  271. alert.get_uuid()))
  272. alert.set_helpers(self._collector, self._cluster_configuration)
  273. alert.collect()
  274. except:
  275. logger.exception("[AlertScheduler] Unable to execute the alert outside of the job scheduler")
  276. def main():
  277. args = list(sys.argv)
  278. del args[0]
  279. try:
  280. logger.setLevel(logging.DEBUG)
  281. except TypeError:
  282. logger.setLevel(12)
  283. ch = logging.StreamHandler()
  284. ch.setLevel(logger.level)
  285. logger.addHandler(ch)
  286. ash = AlertSchedulerHandler(args[0], args[1], args[2], False)
  287. ash.start()
  288. i = 0
  289. try:
  290. while i < 10:
  291. time.sleep(1)
  292. i += 1
  293. except KeyboardInterrupt:
  294. pass
  295. print str(ash.collector().alerts())
  296. ash.stop()
  297. if __name__ == "__main__":
  298. main()