RecoveryManager.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. #!/usr/bin/env python
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import copy
  18. import time
  19. import threading
  20. import pprint
  21. from ambari_agent.ActionQueue import ActionQueue
  22. from ambari_agent.LiveStatus import LiveStatus
  23. logger = logging.getLogger()
  24. """
  25. RecoveryManager has the following capabilities:
  26. * Store data needed for execution commands extracted from STATUS command
  27. * Generate INSTALL command
  28. * Generate START command
  29. """
  30. class RecoveryManager:
  31. COMMAND_TYPE = "commandType"
  32. PAYLOAD_LEVEL = "payloadLevel"
  33. COMPONENT_NAME = "componentName"
  34. ROLE = "role"
  35. TASK_ID = "taskId"
  36. DESIRED_STATE = "desiredState"
  37. EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
  38. ROLE_COMMAND = "roleCommand"
  39. PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
  40. PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
  41. PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND"
  42. STARTED = "STARTED"
  43. INSTALLED = "INSTALLED"
  44. INIT = "INIT" # TODO: What is the state when machine is reset
  45. default_action_counter = {
  46. "lastAttempt": 0,
  47. "count": 0,
  48. "lastReset": 0,
  49. "lifetimeCount" : 0,
  50. "warnedLastAttempt": False,
  51. "warnedLastReset": False,
  52. "warnedThresholdReached": False
  53. }
  54. def __init__(self, recovery_enabled=False, auto_start_only=False):
  55. self.recovery_enabled = recovery_enabled
  56. self.auto_start_only = auto_start_only
  57. self.max_count = 6
  58. self.window_in_min = 60
  59. self.retry_gap = 5
  60. self.max_lifetime_count = 12
  61. self.stored_exec_commands = {}
  62. self.id = int(time.time())
  63. self.allowed_desired_states = [self.STARTED, self.INSTALLED]
  64. self.allowed_current_states = [self.INIT, self.INSTALLED]
  65. self.actions = {}
  66. self.statuses = {}
  67. self.__status_lock = threading.RLock()
  68. self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
  69. pass
  70. def enabled(self):
  71. return self.recovery_enabled
  72. def update_current_status(self, component, state):
  73. """
  74. Updates the current status of a host component managed by the agent
  75. """
  76. if component not in self.statuses:
  77. self.__status_lock.acquire()
  78. try:
  79. if component not in self.statuses:
  80. self.statuses[component] = {
  81. "current": "",
  82. "desired": ""
  83. }
  84. finally:
  85. self.__status_lock.release()
  86. pass
  87. self.statuses[component]["current"] = state
  88. pass
  89. def update_desired_status(self, component, state):
  90. """
  91. Updates the desired status of a host component managed by the agent
  92. """
  93. if component not in self.statuses:
  94. self.__status_lock.acquire()
  95. try:
  96. if component not in self.statuses:
  97. self.statuses[component] = {
  98. "current": "",
  99. "desired": ""
  100. }
  101. finally:
  102. self.__status_lock.release()
  103. pass
  104. self.statuses[component]["desired"] = state
  105. pass
  106. def requires_recovery(self, component):
  107. """
  108. Recovery is allowed for:
  109. INISTALLED --> STARTED
  110. INIT --> INSTALLED --> STARTED
  111. CLIENTs may be RE-INSTALLED (TODO)
  112. """
  113. if not self.enabled():
  114. return False
  115. if component not in self.statuses:
  116. return False
  117. status = self.statuses[component]
  118. if status["current"] == status["desired"]:
  119. return False
  120. if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
  121. return False
  122. ### No recovery to INSTALLED or INIT states
  123. if status["current"] == self.STARTED:
  124. return False
  125. logger.info("%s needs recovery.", component)
  126. return True
  127. pass
  128. def get_recovery_status(self):
  129. """
  130. Creates a status in the form
  131. {
  132. "summary" : "RECOVERABLE|DISABLED|PARTIALLY_RECOVERABLE|UNRECOVERABLE",
  133. "component_reports" : [
  134. {
  135. "name": "component_name",
  136. "numAttempts" : "x",
  137. "limitReached" : "true|false"
  138. "status" : "REQUIRES_RECOVERY|RECOVERY_COMMAND_REQUESTED|RECOVERY_COMMAND_ISSUED|NO_RECOVERY_NEEDED"
  139. }
  140. ]
  141. }
  142. """
  143. report = {}
  144. report["summary"] = "DISABLED"
  145. if self.enabled():
  146. report["summary"] = "RECOVERABLE"
  147. num_limits_reached = 0
  148. recovery_states = []
  149. report["componentReports"] = recovery_states
  150. self.__status_lock.acquire()
  151. try:
  152. for component in self.actions.keys():
  153. action = self.actions[component]
  154. recovery_state = {}
  155. recovery_state["name"] = component
  156. recovery_state["numAttempts"] = action["lifetimeCount"]
  157. recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
  158. recovery_states.append(recovery_state)
  159. if recovery_state["limitReached"] == True:
  160. num_limits_reached += 1
  161. pass
  162. finally:
  163. self.__status_lock.release()
  164. if num_limits_reached > 0:
  165. report["summary"] = "PARTIALLY_RECOVERABLE"
  166. if num_limits_reached == len(recovery_states):
  167. report["summary"] = "UNRECOVERABLE"
  168. return report
  169. pass
  170. def get_recovery_commands(self):
  171. """
  172. This method computes the recovery commands for the following transitions
  173. INSTALLED --> STARTED
  174. INIT --> INSTALLED
  175. """
  176. commands = []
  177. for component in self.statuses.keys():
  178. if self.requires_recovery(component) and self.may_execute(component):
  179. status = copy.deepcopy(self.statuses[component])
  180. if status["desired"] == self.STARTED:
  181. if status["current"] == self.INSTALLED:
  182. command = self.get_start_command(component)
  183. elif status["current"] == self.INIT:
  184. command = self.get_install_command(component)
  185. elif status["desired"] == self.INSTALLED:
  186. if status["current"] == self.INIT:
  187. command = self.get_install_command(component)
  188. if command:
  189. self.execute(component)
  190. commands.append(command)
  191. return commands
  192. pass
  193. def may_execute(self, action):
  194. """
  195. Check if an action can be executed
  196. """
  197. if not action or action.strip() == "":
  198. return False
  199. if action not in self.actions:
  200. self.__status_lock.acquire()
  201. try:
  202. self.actions[action] = copy.deepcopy(self.default_action_counter)
  203. finally:
  204. self.__status_lock.release()
  205. return self._execute_action_chk_only(action)
  206. pass
  207. def execute(self, action):
  208. """
  209. Executed an action
  210. """
  211. if not action or action.strip() == "":
  212. return False
  213. if action not in self.actions:
  214. self.__status_lock.acquire()
  215. try:
  216. self.actions[action] = copy.deepcopy(self.default_action_counter)
  217. finally:
  218. self.__status_lock.release()
  219. return self._execute_action_(action)
  220. pass
  221. def _execute_action_(self, action_name):
  222. """
  223. _private_ implementation of [may] execute
  224. """
  225. action_counter = self.actions[action_name]
  226. now = self._now_()
  227. seconds_since_last_attempt = now - action_counter["lastAttempt"]
  228. if action_counter["lifetimeCount"] < self.max_lifetime_count:
  229. if action_counter["count"] < self.max_count:
  230. if seconds_since_last_attempt > self.retry_gap_in_sec:
  231. action_counter["count"] += 1
  232. action_counter["lifetimeCount"] +=1
  233. if self.retry_gap > 0:
  234. action_counter["lastAttempt"] = now
  235. action_counter["warnedLastAttempt"] = False
  236. if action_counter["count"] == 1:
  237. action_counter["lastReset"] = now
  238. return True
  239. else:
  240. if action_counter["warnedLastAttempt"] == False:
  241. action_counter["warnedLastAttempt"] = True
  242. logger.warn(
  243. "%s seconds has not passed since last occurrence %s seconds back for %s. " +
  244. "Will silently skip execution without warning till retry gap is passed",
  245. self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
  246. else:
  247. logger.debug(
  248. "%s seconds has not passed since last occurrence %s seconds back for %s",
  249. self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
  250. else:
  251. sec_since_last_reset = now - action_counter["lastReset"]
  252. if sec_since_last_reset > self.window_in_sec:
  253. action_counter["count"] = 1
  254. action_counter["lifetimeCount"] +=1
  255. if self.retry_gap > 0:
  256. action_counter["lastAttempt"] = now
  257. action_counter["lastReset"] = now
  258. action_counter["warnedLastReset"] = False
  259. return True
  260. else:
  261. if action_counter["warnedLastReset"] == False:
  262. action_counter["warnedLastReset"] = True
  263. logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
  264. "Will silently skip execution without warning till window is reset",
  265. action_counter["count"], self.window_in_min, action_name)
  266. else:
  267. logger.debug("%s occurrences in %s minutes reached the limit for %s",
  268. action_counter["count"], self.window_in_min, action_name)
  269. else:
  270. if action_counter["warnedThresholdReached"] == False:
  271. action_counter["warnedThresholdReached"] = True
  272. logger.warn("%s occurrences in agent life time reached the limit for %s. " +
  273. "Will silently skip execution without warning till window is reset",
  274. action_counter["lifetimeCount"], action_name)
  275. else:
  276. logger.debug("%s occurrences in agent life time reached the limit for %s",
  277. action_counter["lifetimeCount"], action_name)
  278. return False
  279. pass
  280. def _execute_action_chk_only(self, action_name):
  281. """
  282. _private_ implementation of [may] execute check only
  283. """
  284. action_counter = self.actions[action_name]
  285. now = self._now_()
  286. seconds_since_last_attempt = now - action_counter["lastAttempt"]
  287. if action_counter["lifetimeCount"] < self.max_lifetime_count:
  288. if action_counter["count"] < self.max_count:
  289. if seconds_since_last_attempt > self.retry_gap_in_sec:
  290. return True
  291. else:
  292. sec_since_last_reset = now - action_counter["lastReset"]
  293. if sec_since_last_reset > self.window_in_sec:
  294. return True
  295. return False
  296. pass
  297. def _now_(self):
  298. return int(time.time())
  299. pass
  300. def update_configuration_from_registration(self, reg_resp):
  301. """
  302. TODO: Server sends the recovery configuration - call update_config after parsing
  303. "recovery_config": {
  304. "type" : "DEFAULT|AUTO_START|FULL",
  305. "maxCount" : 10,
  306. "windowInMinutes" : 60,
  307. "retryGap" : 0 }
  308. """
  309. recovery_enabled = False
  310. auto_start_only = True
  311. max_count = 6
  312. window_in_min = 60
  313. retry_gap = 5
  314. max_lifetime_count = 12
  315. if reg_resp and "recoveryConfig" in reg_resp:
  316. config = reg_resp["recoveryConfig"]
  317. if "type" in config:
  318. if config["type"] in ["AUTO_START", "FULL"]:
  319. recovery_enabled = True
  320. if config["type"] == "FULL":
  321. auto_start_only = False
  322. if "maxCount" in config:
  323. max_count = self._read_int_(config["maxCount"], max_count)
  324. if "windowInMinutes" in config:
  325. window_in_min = self._read_int_(config["windowInMinutes"], window_in_min)
  326. if "retryGap" in config:
  327. retry_gap = self._read_int_(config["retryGap"], retry_gap)
  328. if 'maxLifetimeCount' in config:
  329. max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count)
  330. self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only)
  331. pass
  332. def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only):
  333. """
  334. Update recovery configuration, recovery is disabled if configuration values
  335. are not correct
  336. """
  337. self.recovery_enabled = False;
  338. if max_count <= 0:
  339. logger.warn("Recovery disabled: max_count must be a non-negative number")
  340. return
  341. if window_in_min <= 0:
  342. logger.warn("Recovery disabled: window_in_min must be a non-negative number")
  343. return
  344. if retry_gap < 1:
  345. logger.warn("Recovery disabled: retry_gap must be a positive number and at least 1")
  346. return
  347. if retry_gap >= window_in_min:
  348. logger.warn("Recovery disabled: retry_gap must be smaller than window_in_min")
  349. return
  350. if max_lifetime_count < 0 or max_lifetime_count < max_count:
  351. logger.warn("Recovery disabled: max_lifetime_count must more than 0 and >= max_count")
  352. return
  353. self.max_count = max_count
  354. self.window_in_min = window_in_min
  355. self.retry_gap = retry_gap
  356. self.window_in_sec = window_in_min * 60
  357. self.retry_gap_in_sec = retry_gap * 60
  358. self.auto_start_only = auto_start_only
  359. self.max_lifetime_count = max_lifetime_count
  360. self.allowed_desired_states = [self.STARTED, self.INSTALLED]
  361. self.allowed_current_states = [self.INIT, self.INSTALLED]
  362. if self.auto_start_only:
  363. self.allowed_desired_states = [self.STARTED]
  364. self.allowed_current_states = [self.INSTALLED]
  365. self.recovery_enabled = recovery_enabled
  366. if self.recovery_enabled:
  367. logger.info(
  368. "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and lifetime max being %s.",
  369. self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count)
  370. pass
  371. def get_unique_task_id(self):
  372. self.id += 1
  373. return self.id
  374. pass
  375. def process_status_commands(self, commands):
  376. if not self.enabled():
  377. return
  378. if commands and len(commands) > 0:
  379. for command in commands:
  380. self.store_or_update_command(command)
  381. if self.EXECUTION_COMMAND_DETAILS in command:
  382. logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS]))
  383. pass
  384. def process_execution_commands(self, commands):
  385. if not self.enabled():
  386. return
  387. if commands and len(commands) > 0:
  388. for command in commands:
  389. if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
  390. if self.ROLE in command:
  391. if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
  392. self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
  393. if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
  394. self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
  395. pass
  396. def store_or_update_command(self, command):
  397. """
  398. Stores command details by reading them from the STATUS_COMMAND
  399. Update desired state as well
  400. """
  401. if not self.enabled():
  402. return
  403. logger.debug("Inspecting command to store/update details")
  404. if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND:
  405. payloadLevel = self.PAYLOAD_LEVEL_DEFAULT
  406. if self.PAYLOAD_LEVEL in command:
  407. payloadLevel = command[self.PAYLOAD_LEVEL]
  408. component = command[self.COMPONENT_NAME]
  409. self.update_desired_status(component, command[self.DESIRED_STATE])
  410. if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
  411. if self.EXECUTION_COMMAND_DETAILS in command:
  412. # Store the execution command details
  413. self.remove_command(component)
  414. self.stored_exec_commands[component] = command[self.EXECUTION_COMMAND_DETAILS]
  415. logger.debug("Stored command details for " + component)
  416. else:
  417. logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
  418. pass
  419. pass
  420. def get_install_command(self, component):
  421. if self.enabled():
  422. logger.debug("Using stored INSTALL command for %s", component)
  423. if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
  424. command = copy.deepcopy(self.stored_exec_commands[component])
  425. command[self.ROLE_COMMAND] = "INSTALL"
  426. command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
  427. command[self.TASK_ID] = self.get_unique_task_id()
  428. return command
  429. else:
  430. logger.info("INSTALL command cannot be computed.")
  431. else:
  432. logger.info("Recovery is not enabled. INSTALL command will not be computed.")
  433. return None
  434. pass
  435. def get_start_command(self, component):
  436. if self.enabled():
  437. logger.debug("Using stored START command for %s", component)
  438. if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
  439. command = copy.deepcopy(self.stored_exec_commands[component])
  440. command[self.ROLE_COMMAND] = "START"
  441. command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
  442. command[self.TASK_ID] = self.get_unique_task_id()
  443. return command
  444. else:
  445. logger.info("START command cannot be computed.")
  446. else:
  447. logger.info("Recovery is not enabled. START command will not be computed.")
  448. return None
  449. pass
  450. def command_exists(self, component, command_type):
  451. if command_type == ActionQueue.EXECUTION_COMMAND:
  452. if component in self.stored_exec_commands:
  453. return True
  454. return False
  455. pass
  456. def remove_command(self, component):
  457. if component in self.stored_exec_commands:
  458. del self.stored_exec_commands[component]
  459. return True
  460. return False
  461. def _read_int_(self, value, default_value=0):
  462. int_value = default_value
  463. try:
  464. int_value = int(value)
  465. except ValueError:
  466. pass
  467. return int_value
  468. def main(argv=None):
  469. cmd_mgr = RecoveryManager()
  470. pass
  471. if __name__ == '__main__':
  472. main()