ActionQueue.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import Queue
  18. import logging
  19. import traceback
  20. import threading
  21. import pprint
  22. import os
  23. import ambari_simplejson as json
  24. import time
  25. from AgentException import AgentException
  26. from LiveStatus import LiveStatus
  27. from ActualConfigHandler import ActualConfigHandler
  28. from CommandStatusDict import CommandStatusDict
  29. from CustomServiceOrchestrator import CustomServiceOrchestrator
  30. from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
  31. logger = logging.getLogger()
  32. installScriptHash = -1
  33. class ActionQueue(threading.Thread):
  34. """ Action Queue for the agent. We pick one command at a time from the queue
  35. and execute it
  36. Note: Action and command terms in this and related classes are used interchangeably
  37. """
  38. # How many actions can be performed in parallel. Feel free to change
  39. MAX_CONCURRENT_ACTIONS = 5
  40. #How much time(in seconds) we need wait for new incoming execution command before checking
  41. #status command queue
  42. EXECUTION_COMMAND_WAIT_TIME = 2
  43. STATUS_COMMAND = 'STATUS_COMMAND'
  44. EXECUTION_COMMAND = 'EXECUTION_COMMAND'
  45. AUTO_EXECUTION_COMMAND = 'AUTO_EXECUTION_COMMAND'
  46. BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
  47. ROLE_COMMAND_INSTALL = 'INSTALL'
  48. ROLE_COMMAND_START = 'START'
  49. ROLE_COMMAND_STOP = 'STOP'
  50. ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
  51. CUSTOM_COMMAND_RESTART = 'RESTART'
  52. IN_PROGRESS_STATUS = 'IN_PROGRESS'
  53. COMPLETED_STATUS = 'COMPLETED'
  54. FAILED_STATUS = 'FAILED'
  55. def __init__(self, config, controller):
  56. super(ActionQueue, self).__init__()
  57. self.commandQueue = Queue.Queue()
  58. self.statusCommandQueue = Queue.Queue()
  59. self.backgroundCommandQueue = Queue.Queue()
  60. self.commandStatuses = CommandStatusDict(callback_action =
  61. self.status_update_callback)
  62. self.config = config
  63. self.controller = controller
  64. self.configTags = {}
  65. self._stop = threading.Event()
  66. self.tmpdir = config.get('agent', 'prefix')
  67. self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
  68. self.parallel_execution = config.get_parallel_exec_option()
  69. if self.parallel_execution == 1:
  70. logger.info("Parallel execution is enabled, will start Agent commands in parallel")
  71. def stop(self):
  72. self._stop.set()
  73. def stopped(self):
  74. return self._stop.isSet()
  75. def put_status(self, commands):
  76. #Was supposed that we got all set of statuses, we don't need to keep old ones
  77. self.statusCommandQueue.queue.clear()
  78. for command in commands:
  79. logger.info("Adding " + command['commandType'] + " for service " + \
  80. command['serviceName'] + " of cluster " + \
  81. command['clusterName'] + " to the queue.")
  82. self.statusCommandQueue.put(command)
  83. def put(self, commands):
  84. for command in commands:
  85. if not command.has_key('serviceName'):
  86. command['serviceName'] = "null"
  87. if not command.has_key('clusterName'):
  88. command['clusterName'] = 'null'
  89. logger.info("Adding " + command['commandType'] + " for role " + \
  90. command['role'] + " for service " + \
  91. command['serviceName'] + " of cluster " + \
  92. command['clusterName'] + " to the queue.")
  93. if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
  94. self.backgroundCommandQueue.put(self.createCommandHandle(command))
  95. else:
  96. self.commandQueue.put(command)
  97. def cancel(self, commands):
  98. for command in commands:
  99. logger.info("Canceling command {tid}".format(tid = str(command['target_task_id'])))
  100. logger.debug(pprint.pformat(command))
  101. task_id = command['target_task_id']
  102. reason = command['reason']
  103. # Remove from the command queue by task_id
  104. queue = self.commandQueue
  105. self.commandQueue = Queue.Queue()
  106. while not queue.empty():
  107. queued_command = queue.get(False)
  108. if queued_command['task_id'] != task_id:
  109. self.commandQueue.put(queued_command)
  110. else:
  111. logger.info("Canceling " + queued_command['commandType'] + \
  112. " for service " + queued_command['serviceName'] + \
  113. " of cluster " + queued_command['clusterName'] + \
  114. " to the queue.")
  115. # Kill if in progress
  116. self.customServiceOrchestrator.cancel_command(task_id, reason)
  117. def run(self):
  118. while not self.stopped():
  119. self.processBackgroundQueueSafeEmpty();
  120. self.processStatusCommandQueueSafeEmpty();
  121. try:
  122. if self.parallel_execution == 0:
  123. command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
  124. self.process_command(command)
  125. else:
  126. # If parallel execution is enabled, just kick off all available
  127. # commands using separate threads
  128. while (True):
  129. command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
  130. logger.info("Kicking off a thread for the command, id=" +
  131. str(command['commandId']) + " taskId=" + str(command['taskId']))
  132. t = threading.Thread(target=self.process_command, args=(command,))
  133. t.daemon = True
  134. t.start()
  135. except (Queue.Empty):
  136. pass
  137. def processBackgroundQueueSafeEmpty(self):
  138. while not self.backgroundCommandQueue.empty():
  139. try:
  140. command = self.backgroundCommandQueue.get(False)
  141. if(command.has_key('__handle') and command['__handle'].status == None):
  142. self.process_command(command)
  143. except (Queue.Empty):
  144. pass
  145. def processStatusCommandQueueSafeEmpty(self):
  146. while not self.statusCommandQueue.empty():
  147. try:
  148. command = self.statusCommandQueue.get(False)
  149. self.process_command(command)
  150. except (Queue.Empty):
  151. pass
  152. def createCommandHandle(self, command):
  153. if(command.has_key('__handle')):
  154. raise AgentException("Command already has __handle")
  155. command['__handle'] = BackgroundCommandExecutionHandle(command, command['commandId'], None, self.on_background_command_complete_callback)
  156. return command
  157. def process_command(self, command):
  158. # make sure we log failures
  159. commandType = command['commandType']
  160. logger.debug("Took an element of Queue (command type = %s)." % commandType)
  161. try:
  162. if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
  163. try:
  164. if self.controller.recovery_manager.enabled():
  165. self.controller.recovery_manager.start_execution_command()
  166. self.execute_command(command)
  167. finally:
  168. if self.controller.recovery_manager.enabled():
  169. self.controller.recovery_manager.stop_execution_command()
  170. elif commandType == self.STATUS_COMMAND:
  171. self.execute_status_command(command)
  172. else:
  173. logger.error("Unrecognized command " + pprint.pformat(command))
  174. except Exception, err:
  175. # Should not happen
  176. traceback.print_exc()
  177. logger.warn(err)
  178. def tasks_in_progress_or_pending(self):
  179. return_val = False
  180. if not self.commandQueue.empty():
  181. return_val = True
  182. if self.controller.recovery_manager.has_active_command():
  183. return_val = True
  184. return return_val
  185. pass
  186. def execute_command(self, command):
  187. '''
  188. Executes commands of type EXECUTION_COMMAND
  189. '''
  190. clusterName = command['clusterName']
  191. commandId = command['commandId']
  192. isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
  193. isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND
  194. message = "Executing command with id = {commandId} for role = {role} of " \
  195. "cluster {cluster}.".format(
  196. commandId = str(commandId), role=command['role'],
  197. cluster=clusterName)
  198. logger.info(message)
  199. taskId = command['taskId']
  200. # Preparing 'IN_PROGRESS' report
  201. in_progress_status = self.commandStatuses.generate_report_template(command)
  202. # The path of the files that contain the output log and error log use a prefix that the agent advertises to the
  203. # server. The prefix is defined in agent-config.ini
  204. if not isAutoExecuteCommand:
  205. in_progress_status.update({
  206. 'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
  207. 'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
  208. 'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
  209. 'status': self.IN_PROGRESS_STATUS
  210. })
  211. else:
  212. in_progress_status.update({
  213. 'tmpout': self.tmpdir + os.sep + 'auto_output-' + str(taskId) + '.txt',
  214. 'tmperr': self.tmpdir + os.sep + 'auto_errors-' + str(taskId) + '.txt',
  215. 'structuredOut' : self.tmpdir + os.sep + 'auto_structured-out-' + str(taskId) + '.json',
  216. 'status': self.IN_PROGRESS_STATUS
  217. })
  218. self.commandStatuses.put_command_status(command, in_progress_status)
  219. numAttempts = 0
  220. maxAttempts = 1
  221. retryAble = False
  222. delay = 1
  223. if 'commandParams' in command:
  224. if 'command_retry_max_attempt_count' in command['commandParams']:
  225. maxAttempts = int(command['commandParams']['command_retry_max_attempt_count'])
  226. if 'command_retry_enabled' in command['commandParams']:
  227. retryAble = command['commandParams']['command_retry_enabled'] == "true"
  228. if isAutoExecuteCommand:
  229. retryAble = False
  230. logger.debug("Command execution metadata - retry enabled = {retryAble}, max attempt count = {maxAttemptCount}".
  231. format(retryAble = retryAble, maxAttemptCount = maxAttempts))
  232. while numAttempts < maxAttempts:
  233. numAttempts += 1
  234. # running command
  235. commandresult = self.customServiceOrchestrator.runCommand(command,
  236. in_progress_status['tmpout'], in_progress_status['tmperr'],
  237. override_output_files=numAttempts == 1, retry=numAttempts > 1)
  238. # dumping results
  239. if isCommandBackground:
  240. return
  241. else:
  242. status = self.COMPLETED_STATUS if commandresult['exitcode'] == 0 else self.FAILED_STATUS
  243. if status != self.COMPLETED_STATUS and retryAble == True and maxAttempts > numAttempts:
  244. delay = self.get_retry_delay(delay)
  245. logger.info("Retrying command id {cid} after a wait of {delay}".format(cid = taskId, delay=delay))
  246. time.sleep(delay)
  247. continue
  248. else:
  249. break
  250. roleResult = self.commandStatuses.generate_report_template(command)
  251. roleResult.update({
  252. 'stdout': commandresult['stdout'],
  253. 'stderr': commandresult['stderr'],
  254. 'exitCode': commandresult['exitcode'],
  255. 'status': status,
  256. })
  257. if roleResult['stdout'] == '':
  258. roleResult['stdout'] = 'None'
  259. if roleResult['stderr'] == '':
  260. roleResult['stderr'] = 'None'
  261. # let ambari know name of custom command
  262. if command['hostLevelParams'].has_key('custom_command'):
  263. roleResult['customCommand'] = command['hostLevelParams']['custom_command']
  264. if 'structuredOut' in commandresult:
  265. roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
  266. else:
  267. roleResult['structuredOut'] = ''
  268. # let ambari know that configuration tags were applied
  269. if status == self.COMPLETED_STATUS:
  270. if self.controller.recovery_manager.enabled() and command.has_key('roleCommand'):
  271. if command['roleCommand'] == self.ROLE_COMMAND_START:
  272. self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
  273. elif command['roleCommand'] == self.ROLE_COMMAND_STOP or command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
  274. self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.DEAD_STATUS)
  275. elif command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND:
  276. if command['hostLevelParams'].has_key('custom_command') and \
  277. command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART:
  278. self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
  279. pass
  280. configHandler = ActualConfigHandler(self.config, self.configTags)
  281. #update
  282. if command.has_key('forceRefreshConfigTags') and len(command['forceRefreshConfigTags']) > 0 :
  283. forceRefreshConfigTags = command['forceRefreshConfigTags']
  284. logger.info("Got refresh additional component tags command")
  285. for configTag in forceRefreshConfigTags :
  286. configHandler.update_component_tag(command['role'], configTag, command['configurationTags'][configTag])
  287. roleResult['customCommand'] = self.CUSTOM_COMMAND_RESTART # force restart for component to evict stale_config on server side
  288. command['configurationTags'] = configHandler.read_actual_component(command['role'])
  289. if command.has_key('configurationTags'):
  290. configHandler.write_actual(command['configurationTags'])
  291. roleResult['configurationTags'] = command['configurationTags']
  292. component = {'serviceName':command['serviceName'],'componentName':command['role']}
  293. if command.has_key('roleCommand') and \
  294. (command['roleCommand'] == self.ROLE_COMMAND_START or \
  295. (command['roleCommand'] == self.ROLE_COMMAND_INSTALL \
  296. and component in LiveStatus.CLIENT_COMPONENTS) or \
  297. (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \
  298. command['hostLevelParams'].has_key('custom_command') and \
  299. command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)):
  300. configHandler.write_actual_component(command['role'], command['configurationTags'])
  301. if command['hostLevelParams'].has_key('clientsToUpdateConfigs') and \
  302. command['hostLevelParams']['clientsToUpdateConfigs']:
  303. configHandler.write_client_components(command['serviceName'], command['configurationTags'],
  304. command['hostLevelParams']['clientsToUpdateConfigs'])
  305. roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])
  306. self.commandStatuses.put_command_status(command, roleResult)
  307. def get_retry_delay(self, last_delay):
  308. """
  309. Returns exponentially growing delay. The idea being if number of retries is high then the reason to retry
  310. is probably a host or environment specific issue requiring longer waits
  311. """
  312. return last_delay * 2
  313. def command_was_canceled(self):
  314. self.customServiceOrchestrator
  315. def on_background_command_complete_callback(self, process_condensed_result, handle):
  316. logger.debug('Start callback: %s' % process_condensed_result)
  317. logger.debug('The handle is: %s' % handle)
  318. status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
  319. aborted_postfix = self.customServiceOrchestrator.command_canceled_reason(handle.command['taskId'])
  320. if aborted_postfix:
  321. status = self.FAILED_STATUS
  322. logger.debug('Set status to: %s , reason = %s' % (status, aborted_postfix))
  323. else:
  324. aborted_postfix = ''
  325. roleResult = self.commandStatuses.generate_report_template(handle.command)
  326. roleResult.update({
  327. 'stdout': process_condensed_result['stdout'] + aborted_postfix,
  328. 'stderr': process_condensed_result['stderr'] + aborted_postfix,
  329. 'exitCode': process_condensed_result['exitcode'],
  330. 'structuredOut': str(json.dumps(process_condensed_result['structuredOut'])) if 'structuredOut' in process_condensed_result else '',
  331. 'status': status,
  332. })
  333. self.commandStatuses.put_command_status(handle.command, roleResult)
  334. def execute_status_command(self, command):
  335. '''
  336. Executes commands of type STATUS_COMMAND
  337. '''
  338. try:
  339. cluster = command['clusterName']
  340. service = command['serviceName']
  341. component = command['componentName']
  342. configurations = command['configurations']
  343. if configurations.has_key('global'):
  344. globalConfig = configurations['global']
  345. else:
  346. globalConfig = {}
  347. livestatus = LiveStatus(cluster, service, component,
  348. globalConfig, self.config, self.configTags)
  349. component_extra = None
  350. request_execution_cmd = False
  351. # For custom services, responsibility to determine service status is
  352. # delegated to python scripts
  353. component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
  354. component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(command)
  355. if component_status_result['exitcode'] == 0:
  356. component_status = LiveStatus.LIVE_STATUS
  357. self.controller.recovery_manager.update_current_status(component, component_status)
  358. else:
  359. component_status = LiveStatus.DEAD_STATUS
  360. self.controller.recovery_manager.update_current_status(component, component_status)
  361. request_execution_cmd = self.controller.recovery_manager.requires_recovery(component)
  362. if component_status_result.has_key('structuredOut'):
  363. component_extra = component_status_result['structuredOut']
  364. result = livestatus.build(forced_component_status= component_status)
  365. if self.controller.recovery_manager.enabled():
  366. result['sendExecCmdDet'] = str(request_execution_cmd)
  367. # Add security state to the result
  368. result['securityState'] = component_security_status_result
  369. if component_extra is not None and len(component_extra) != 0:
  370. if component_extra.has_key('alerts'):
  371. result['alerts'] = component_extra['alerts']
  372. del component_extra['alerts']
  373. result['extra'] = component_extra
  374. logger.debug("Got live status for component " + component + \
  375. " of service " + str(service) + \
  376. " of cluster " + str(cluster))
  377. logger.debug(pprint.pformat(result))
  378. if result is not None:
  379. self.commandStatuses.put_command_status(command, result)
  380. except Exception, err:
  381. traceback.print_exc()
  382. logger.warn(err)
  383. pass
  384. # Store action result to agent response queue
  385. def result(self):
  386. return self.commandStatuses.generate_report()
  387. def status_update_callback(self):
  388. """
  389. Actions that are executed every time when command status changes
  390. """
  391. self.controller.trigger_heartbeat()
  392. # Removes all commands from the queue
  393. def reset(self):
  394. queue = self.commandQueue
  395. with queue.mutex:
  396. queue.queue.clear()