RecoveryManager.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
  1. #!/usr/bin/env python
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import copy
  18. import time
  19. import threading
  20. import pprint
  21. from ambari_agent.ActionQueue import ActionQueue
  22. from ambari_agent.LiveStatus import LiveStatus
  23. logger = logging.getLogger()
  24. """
  25. RecoveryManager has the following capabilities:
  26. * Store data needed for execution commands extracted from STATUS command
  27. * Generate INSTALL command
  28. * Generate START command
  29. """
  30. class RecoveryManager:
  31. COMMAND_TYPE = "commandType"
  32. PAYLOAD_LEVEL = "payloadLevel"
  33. COMPONENT_NAME = "componentName"
  34. ROLE = "role"
  35. TASK_ID = "taskId"
  36. DESIRED_STATE = "desiredState"
  37. HAS_STALE_CONFIG = "hasStaleConfigs"
  38. EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
  39. ROLE_COMMAND = "roleCommand"
  40. PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
  41. PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
  42. PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND"
  43. STARTED = "STARTED"
  44. INSTALLED = "INSTALLED"
  45. INIT = "INIT" # TODO: What is the state when machine is reset
  46. COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
  47. COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
  48. default_action_counter = {
  49. "lastAttempt": 0,
  50. "count": 0,
  51. "lastReset": 0,
  52. "lifetimeCount" : 0,
  53. "warnedLastAttempt": False,
  54. "warnedLastReset": False,
  55. "warnedThresholdReached": False
  56. }
  57. default_component_status = {
  58. "current": "",
  59. "desired": "",
  60. "stale_config": False
  61. }
  62. def __init__(self, recovery_enabled=False, auto_start_only=False):
  63. self.recovery_enabled = recovery_enabled
  64. self.auto_start_only = auto_start_only
  65. self.max_count = 6
  66. self.window_in_min = 60
  67. self.retry_gap = 5
  68. self.max_lifetime_count = 12
  69. self.stored_exec_commands = {}
  70. self.id = int(time.time())
  71. self.allowed_desired_states = [self.STARTED, self.INSTALLED]
  72. self.allowed_current_states = [self.INIT, self.INSTALLED]
  73. self.actions = {}
  74. self.statuses = {}
  75. self.__status_lock = threading.RLock()
  76. self.__command_lock = threading.RLock()
  77. self.__active_command_lock = threading.RLock()
  78. self.active_command_count = 0
  79. self.paused = False
  80. self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
  81. pass
  82. def start_execution_command(self):
  83. with self.__active_command_lock:
  84. self.active_command_count += 1
  85. pass
  86. def stop_execution_command(self):
  87. with self.__active_command_lock:
  88. self.active_command_count -= 1
  89. pass
  90. def has_active_command(self):
  91. return self.active_command_count > 0
  92. def set_paused(self, paused):
  93. if self.paused != paused:
  94. logger.debug("RecoveryManager is transitioning from isPaused = " + str(self.paused) + " to " + str(paused))
  95. self.paused = paused
  96. def enabled(self):
  97. return self.recovery_enabled
  98. def update_config_staleness(self, component, is_config_stale):
  99. """
  100. Updates staleness of config
  101. """
  102. if component not in self.statuses:
  103. self.__status_lock.acquire()
  104. try:
  105. if component not in self.statuses:
  106. self.statuses[component] = copy.deepcopy(self.default_component_status)
  107. finally:
  108. self.__status_lock.release()
  109. pass
  110. self.statuses[component]["stale_config"] = is_config_stale
  111. if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
  112. self.statuses[component]["stale_config"] == False:
  113. self.remove_command(component)
  114. pass
  115. def update_current_status(self, component, state):
  116. """
  117. Updates the current status of a host component managed by the agent
  118. """
  119. if component not in self.statuses:
  120. self.__status_lock.acquire()
  121. try:
  122. if component not in self.statuses:
  123. self.statuses[component] = copy.deepcopy(self.default_component_status)
  124. finally:
  125. self.__status_lock.release()
  126. pass
  127. self.statuses[component]["current"] = state
  128. if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
  129. self.statuses[component]["stale_config"] == False:
  130. self.remove_command(component)
  131. pass
  132. def update_desired_status(self, component, state):
  133. """
  134. Updates the desired status of a host component managed by the agent
  135. """
  136. if component not in self.statuses:
  137. self.__status_lock.acquire()
  138. try:
  139. if component not in self.statuses:
  140. self.statuses[component] = copy.deepcopy(self.default_component_status)
  141. finally:
  142. self.__status_lock.release()
  143. pass
  144. self.statuses[component]["desired"] = state
  145. if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
  146. self.statuses[component]["stale_config"] == False:
  147. self.remove_command(component)
  148. pass
  149. def requires_recovery(self, component):
  150. """
  151. Recovery is allowed for:
  152. INISTALLED --> STARTED
  153. INIT --> INSTALLED --> STARTED
  154. RE-INSTALLED (if configs do not match)
  155. """
  156. if not self.enabled():
  157. return False
  158. if component not in self.statuses:
  159. return False
  160. if self.auto_start_only:
  161. status = self.statuses[component]
  162. if status["current"] == status["desired"]:
  163. return False
  164. else:
  165. status = self.statuses[component]
  166. if status["current"] == status["desired"] and status['stale_config'] == False:
  167. return False
  168. if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
  169. return False
  170. logger.info("%s needs recovery.", component)
  171. return True
  172. pass
  173. def get_recovery_status(self):
  174. """
  175. Creates a status in the form
  176. {
  177. "summary" : "RECOVERABLE|DISABLED|PARTIALLY_RECOVERABLE|UNRECOVERABLE",
  178. "component_reports" : [
  179. {
  180. "name": "component_name",
  181. "numAttempts" : "x",
  182. "limitReached" : "true|false"
  183. "status" : "REQUIRES_RECOVERY|RECOVERY_COMMAND_REQUESTED|RECOVERY_COMMAND_ISSUED|NO_RECOVERY_NEEDED"
  184. }
  185. ]
  186. }
  187. """
  188. report = {}
  189. report["summary"] = "DISABLED"
  190. if self.enabled():
  191. report["summary"] = "RECOVERABLE"
  192. num_limits_reached = 0
  193. recovery_states = []
  194. report["componentReports"] = recovery_states
  195. self.__status_lock.acquire()
  196. try:
  197. for component in self.actions.keys():
  198. action = self.actions[component]
  199. recovery_state = {}
  200. recovery_state["name"] = component
  201. recovery_state["numAttempts"] = action["lifetimeCount"]
  202. recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
  203. recovery_states.append(recovery_state)
  204. if recovery_state["limitReached"] == True:
  205. num_limits_reached += 1
  206. pass
  207. finally:
  208. self.__status_lock.release()
  209. if num_limits_reached > 0:
  210. report["summary"] = "PARTIALLY_RECOVERABLE"
  211. if num_limits_reached == len(recovery_states):
  212. report["summary"] = "UNRECOVERABLE"
  213. return report
  214. pass
  215. def get_recovery_commands(self):
  216. """
  217. This method computes the recovery commands for the following transitions
  218. INSTALLED --> STARTED
  219. INIT --> INSTALLED
  220. """
  221. commands = []
  222. for component in self.statuses.keys():
  223. if self.requires_recovery(component) and self.may_execute(component):
  224. status = copy.deepcopy(self.statuses[component])
  225. command = None
  226. if self.auto_start_only:
  227. if status["desired"] == self.STARTED:
  228. if status["current"] == self.INSTALLED:
  229. command = self.get_start_command(component)
  230. else:
  231. # START, INSTALL, RESTART
  232. if status["desired"] != status["current"]:
  233. if status["desired"] == self.STARTED:
  234. if status["current"] == self.INSTALLED:
  235. command = self.get_start_command(component)
  236. elif status["current"] == self.INIT:
  237. command = self.get_install_command(component)
  238. elif status["desired"] == self.INSTALLED:
  239. if status["current"] == self.INIT:
  240. command = self.get_install_command(component)
  241. # else issue a STOP command
  242. else:
  243. if status["current"] == self.INSTALLED:
  244. command = self.get_install_command(component)
  245. elif status["current"] == self.STARTED:
  246. command = self.get_restart_command(component)
  247. if command:
  248. self.execute(component)
  249. commands.append(command)
  250. return commands
  251. pass
  252. def may_execute(self, action):
  253. """
  254. Check if an action can be executed
  255. """
  256. if not action or action.strip() == "":
  257. return False
  258. if action not in self.actions:
  259. self.__status_lock.acquire()
  260. try:
  261. self.actions[action] = copy.deepcopy(self.default_action_counter)
  262. finally:
  263. self.__status_lock.release()
  264. return self._execute_action_chk_only(action)
  265. pass
  266. def execute(self, action):
  267. """
  268. Executed an action
  269. """
  270. if not action or action.strip() == "":
  271. return False
  272. if action not in self.actions:
  273. self.__status_lock.acquire()
  274. try:
  275. self.actions[action] = copy.deepcopy(self.default_action_counter)
  276. finally:
  277. self.__status_lock.release()
  278. return self._execute_action_(action)
  279. pass
  280. def _execute_action_(self, action_name):
  281. """
  282. _private_ implementation of [may] execute
  283. """
  284. action_counter = self.actions[action_name]
  285. now = self._now_()
  286. seconds_since_last_attempt = now - action_counter["lastAttempt"]
  287. if action_counter["lifetimeCount"] < self.max_lifetime_count:
  288. if action_counter["count"] < self.max_count:
  289. if seconds_since_last_attempt > self.retry_gap_in_sec:
  290. action_counter["count"] += 1
  291. action_counter["lifetimeCount"] +=1
  292. if self.retry_gap > 0:
  293. action_counter["lastAttempt"] = now
  294. action_counter["warnedLastAttempt"] = False
  295. if action_counter["count"] == 1:
  296. action_counter["lastReset"] = now
  297. return True
  298. else:
  299. if action_counter["warnedLastAttempt"] == False:
  300. action_counter["warnedLastAttempt"] = True
  301. logger.warn(
  302. "%s seconds has not passed since last occurrence %s seconds back for %s. " +
  303. "Will silently skip execution without warning till retry gap is passed",
  304. self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
  305. else:
  306. logger.debug(
  307. "%s seconds has not passed since last occurrence %s seconds back for %s",
  308. self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
  309. else:
  310. sec_since_last_reset = now - action_counter["lastReset"]
  311. if sec_since_last_reset > self.window_in_sec:
  312. action_counter["count"] = 1
  313. action_counter["lifetimeCount"] +=1
  314. if self.retry_gap > 0:
  315. action_counter["lastAttempt"] = now
  316. action_counter["lastReset"] = now
  317. action_counter["warnedLastReset"] = False
  318. return True
  319. else:
  320. if action_counter["warnedLastReset"] == False:
  321. action_counter["warnedLastReset"] = True
  322. logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
  323. "Will silently skip execution without warning till window is reset",
  324. action_counter["count"], self.window_in_min, action_name)
  325. else:
  326. logger.debug("%s occurrences in %s minutes reached the limit for %s",
  327. action_counter["count"], self.window_in_min, action_name)
  328. else:
  329. if action_counter["warnedThresholdReached"] == False:
  330. action_counter["warnedThresholdReached"] = True
  331. logger.warn("%s occurrences in agent life time reached the limit for %s. " +
  332. "Will silently skip execution without warning till window is reset",
  333. action_counter["lifetimeCount"], action_name)
  334. else:
  335. logger.debug("%s occurrences in agent life time reached the limit for %s",
  336. action_counter["lifetimeCount"], action_name)
  337. return False
  338. pass
  339. def _execute_action_chk_only(self, action_name):
  340. """
  341. _private_ implementation of [may] execute check only
  342. """
  343. action_counter = self.actions[action_name]
  344. now = self._now_()
  345. seconds_since_last_attempt = now - action_counter["lastAttempt"]
  346. if action_counter["lifetimeCount"] < self.max_lifetime_count:
  347. if action_counter["count"] < self.max_count:
  348. if seconds_since_last_attempt > self.retry_gap_in_sec:
  349. return True
  350. else:
  351. sec_since_last_reset = now - action_counter["lastReset"]
  352. if sec_since_last_reset > self.window_in_sec:
  353. return True
  354. return False
  355. pass
  356. def _now_(self):
  357. return int(time.time())
  358. pass
  359. def update_configuration_from_registration(self, reg_resp):
  360. """
  361. TODO: Server sends the recovery configuration - call update_config after parsing
  362. "recoveryConfig": {
  363. "type" : "DEFAULT|AUTO_START|FULL",
  364. "maxCount" : 10,
  365. "windowInMinutes" : 60,
  366. "retryGap" : 0 }
  367. """
  368. recovery_enabled = False
  369. auto_start_only = True
  370. max_count = 6
  371. window_in_min = 60
  372. retry_gap = 5
  373. max_lifetime_count = 12
  374. if reg_resp and "recoveryConfig" in reg_resp:
  375. config = reg_resp["recoveryConfig"]
  376. if "type" in config:
  377. if config["type"] in ["AUTO_START", "FULL"]:
  378. recovery_enabled = True
  379. if config["type"] == "FULL":
  380. auto_start_only = False
  381. if "maxCount" in config:
  382. max_count = self._read_int_(config["maxCount"], max_count)
  383. if "windowInMinutes" in config:
  384. window_in_min = self._read_int_(config["windowInMinutes"], window_in_min)
  385. if "retryGap" in config:
  386. retry_gap = self._read_int_(config["retryGap"], retry_gap)
  387. if 'maxLifetimeCount' in config:
  388. max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count)
  389. self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only)
  390. pass
  391. def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only):
  392. """
  393. Update recovery configuration, recovery is disabled if configuration values
  394. are not correct
  395. """
  396. self.recovery_enabled = False;
  397. if max_count <= 0:
  398. logger.warn("Recovery disabled: max_count must be a non-negative number")
  399. return
  400. if window_in_min <= 0:
  401. logger.warn("Recovery disabled: window_in_min must be a non-negative number")
  402. return
  403. if retry_gap < 1:
  404. logger.warn("Recovery disabled: retry_gap must be a positive number and at least 1")
  405. return
  406. if retry_gap >= window_in_min:
  407. logger.warn("Recovery disabled: retry_gap must be smaller than window_in_min")
  408. return
  409. if max_lifetime_count < 0 or max_lifetime_count < max_count:
  410. logger.warn("Recovery disabled: max_lifetime_count must more than 0 and >= max_count")
  411. return
  412. self.max_count = max_count
  413. self.window_in_min = window_in_min
  414. self.retry_gap = retry_gap
  415. self.window_in_sec = window_in_min * 60
  416. self.retry_gap_in_sec = retry_gap * 60
  417. self.auto_start_only = auto_start_only
  418. self.max_lifetime_count = max_lifetime_count
  419. self.allowed_desired_states = [self.STARTED, self.INSTALLED]
  420. self.allowed_current_states = [self.INIT, self.INSTALLED, self.STARTED]
  421. if self.auto_start_only:
  422. self.allowed_desired_states = [self.STARTED]
  423. self.allowed_current_states = [self.INSTALLED]
  424. self.recovery_enabled = recovery_enabled
  425. if self.recovery_enabled:
  426. logger.info(
  427. "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and lifetime max being %s.",
  428. self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count)
  429. pass
  430. def get_unique_task_id(self):
  431. self.id += 1
  432. return self.id
  433. pass
  434. def process_status_commands(self, commands):
  435. if not self.enabled():
  436. return
  437. if commands and len(commands) > 0:
  438. for command in commands:
  439. self.store_or_update_command(command)
  440. if self.EXECUTION_COMMAND_DETAILS in command:
  441. logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS]))
  442. pass
  443. def process_execution_commands(self, commands):
  444. if not self.enabled():
  445. return
  446. if commands and len(commands) > 0:
  447. for command in commands:
  448. if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
  449. if self.ROLE in command:
  450. if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
  451. self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
  452. if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
  453. self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
  454. pass
  455. def store_or_update_command(self, command):
  456. """
  457. Stores command details by reading them from the STATUS_COMMAND
  458. Update desired state as well
  459. """
  460. if not self.enabled():
  461. return
  462. logger.debug("Inspecting command to store/update details")
  463. if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND:
  464. payloadLevel = self.PAYLOAD_LEVEL_DEFAULT
  465. if self.PAYLOAD_LEVEL in command:
  466. payloadLevel = command[self.PAYLOAD_LEVEL]
  467. component = command[self.COMPONENT_NAME]
  468. self.update_desired_status(component, command[self.DESIRED_STATE])
  469. self.update_config_staleness(component, command[self.HAS_STALE_CONFIG])
  470. if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
  471. if self.EXECUTION_COMMAND_DETAILS in command:
  472. # Store the execution command details
  473. self.remove_command(component)
  474. self.add_command(component, command[self.EXECUTION_COMMAND_DETAILS])
  475. logger.debug("Stored command details for " + component)
  476. else:
  477. logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
  478. pass
  479. pass
  480. def get_install_command(self, component):
  481. if self.paused:
  482. logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
  483. return None
  484. if self.enabled():
  485. logger.debug("Using stored INSTALL command for %s", component)
  486. if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
  487. command = copy.deepcopy(self.stored_exec_commands[component])
  488. command[self.ROLE_COMMAND] = "INSTALL"
  489. command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
  490. command[self.TASK_ID] = self.get_unique_task_id()
  491. return command
  492. else:
  493. logger.info("INSTALL command cannot be computed as details are not received from Server.")
  494. else:
  495. logger.info("Recovery is not enabled. INSTALL command will not be computed.")
  496. return None
  497. pass
  498. def get_restart_command(self, component):
  499. if self.paused:
  500. logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
  501. return None
  502. if self.enabled():
  503. logger.debug("Using stored INSTALL command for %s", component)
  504. if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
  505. command = copy.deepcopy(self.stored_exec_commands[component])
  506. command[self.ROLE_COMMAND] = "CUSTOM_COMMAND"
  507. command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
  508. command[self.TASK_ID] = self.get_unique_task_id()
  509. command['hostLevelParams']['custom_command'] = 'RESTART'
  510. return command
  511. else:
  512. logger.info("RESTART command cannot be computed as details are not received from Server.")
  513. else:
  514. logger.info("Recovery is not enabled. RESTART command will not be computed.")
  515. return None
  516. pass
  517. def get_start_command(self, component):
  518. if self.paused:
  519. logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
  520. return None
  521. if self.enabled():
  522. logger.debug("Using stored START command for %s", component)
  523. if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
  524. command = copy.deepcopy(self.stored_exec_commands[component])
  525. command[self.ROLE_COMMAND] = "START"
  526. command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
  527. command[self.TASK_ID] = self.get_unique_task_id()
  528. return command
  529. else:
  530. logger.info("START command cannot be computed as details are not received from Server.")
  531. else:
  532. logger.info("Recovery is not enabled. START command will not be computed.")
  533. return None
  534. pass
  535. def command_exists(self, component, command_type):
  536. if command_type == ActionQueue.EXECUTION_COMMAND:
  537. self.remove_stale_command(component)
  538. if component in self.stored_exec_commands:
  539. return True
  540. return False
  541. pass
  542. def remove_stale_command(self, component):
  543. component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
  544. if component in self.stored_exec_commands:
  545. insert_time = self.stored_exec_commands[component_update_key]
  546. age = self._now_() - insert_time
  547. if self.COMMAND_REFRESH_DELAY_SEC < age:
  548. logger.debug("Removing stored command for component : " + str(component) + " as its " + str(age) + " sec old")
  549. self.remove_command(component)
  550. pass
  551. def remove_command(self, component):
  552. if component in self.stored_exec_commands:
  553. self.__status_lock.acquire()
  554. try:
  555. component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
  556. del self.stored_exec_commands[component]
  557. del self.stored_exec_commands[component_update_key]
  558. logger.debug("Removed stored command for component : " + str(component))
  559. return True
  560. finally:
  561. self.__status_lock.release()
  562. return False
  563. def add_command(self, component, command):
  564. self.__status_lock.acquire()
  565. try:
  566. component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
  567. self.stored_exec_commands[component] = command
  568. self.stored_exec_commands[component_update_key] = self._now_()
  569. logger.debug("Added command for component : " + str(component))
  570. finally:
  571. self.__status_lock.release()
  572. def _read_int_(self, value, default_value=0):
  573. int_value = default_value
  574. try:
  575. int_value = int(value)
  576. except ValueError:
  577. pass
  578. return int_value
  579. def main(argv=None):
  580. cmd_mgr = RecoveryManager()
  581. pass
  582. if __name__ == '__main__':
  583. main()