ActionQueue.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import Queue
  18. import logging
  19. import traceback
  20. import threading
  21. import pprint
  22. import os
  23. import json
  24. from LiveStatus import LiveStatus
  25. from shell import shellRunner
  26. from ActualConfigHandler import ActualConfigHandler
  27. from CommandStatusDict import CommandStatusDict
  28. from CustomServiceOrchestrator import CustomServiceOrchestrator
  29. logger = logging.getLogger()
  30. installScriptHash = -1
  31. class ActionQueue(threading.Thread):
  32. """ Action Queue for the agent. We pick one command at a time from the queue
  33. and execute it
  34. Note: Action and command terms in this and related classes are used interchangeably
  35. """
  36. # How many actions can be performed in parallel. Feel free to change
  37. MAX_CONCURRENT_ACTIONS = 5
  38. #How much time(in seconds) we need wait for new incoming execution command before checking
  39. #status command queue
  40. EXECUTION_COMMAND_WAIT_TIME = 2
  41. STATUS_COMMAND = 'STATUS_COMMAND'
  42. EXECUTION_COMMAND = 'EXECUTION_COMMAND'
  43. ROLE_COMMAND_INSTALL = 'INSTALL'
  44. ROLE_COMMAND_START = 'START'
  45. ROLE_COMMAND_STOP = 'STOP'
  46. ROLE_COMMAND_CUSTOM_COMMAND = 'CUSTOM_COMMAND'
  47. CUSTOM_COMMAND_RESTART = 'RESTART'
  48. IN_PROGRESS_STATUS = 'IN_PROGRESS'
  49. COMPLETED_STATUS = 'COMPLETED'
  50. FAILED_STATUS = 'FAILED'
  51. def __init__(self, config, controller):
  52. super(ActionQueue, self).__init__()
  53. self.commandQueue = Queue.Queue()
  54. self.statusCommandQueue = Queue.Queue()
  55. self.commandStatuses = CommandStatusDict(callback_action =
  56. self.status_update_callback)
  57. self.config = config
  58. self.controller = controller
  59. self.sh = shellRunner()
  60. self.configTags = {}
  61. self._stop = threading.Event()
  62. self.tmpdir = config.get('agent', 'prefix')
  63. self.customServiceOrchestrator = CustomServiceOrchestrator(config,
  64. controller)
  65. def stop(self):
  66. self._stop.set()
  67. def stopped(self):
  68. return self._stop.isSet()
  69. def put_status(self, commands):
  70. #Was supposed that we got all set of statuses, we don't need to keep old ones
  71. self.statusCommandQueue.queue.clear()
  72. for command in commands:
  73. logger.info("Adding " + command['commandType'] + " for service " + \
  74. command['serviceName'] + " of cluster " + \
  75. command['clusterName'] + " to the queue.")
  76. logger.debug(pprint.pformat(command))
  77. self.statusCommandQueue.put(command)
  78. def put(self, commands):
  79. for command in commands:
  80. if not command.has_key('serviceName'):
  81. command['serviceName'] = "null"
  82. if not command.has_key('clusterName'):
  83. command['clusterName'] = 'null'
  84. logger.info("Adding " + command['commandType'] + " for service " + \
  85. command['serviceName'] + " of cluster " + \
  86. command['clusterName'] + " to the queue.")
  87. logger.debug(pprint.pformat(command))
  88. self.commandQueue.put(command)
  89. def run(self):
  90. while not self.stopped():
  91. while not self.statusCommandQueue.empty():
  92. try:
  93. command = self.statusCommandQueue.get(False)
  94. self.process_command(command)
  95. except (Queue.Empty):
  96. pass
  97. try:
  98. command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
  99. self.process_command(command)
  100. except (Queue.Empty):
  101. pass
  102. def process_command(self, command):
  103. logger.debug("Took an element of Queue: " + pprint.pformat(command))
  104. # make sure we log failures
  105. try:
  106. if command['commandType'] == self.EXECUTION_COMMAND:
  107. self.execute_command(command)
  108. elif command['commandType'] == self.STATUS_COMMAND:
  109. self.execute_status_command(command)
  110. else:
  111. logger.error("Unrecognized command " + pprint.pformat(command))
  112. except Exception, err:
  113. # Should not happen
  114. traceback.print_exc()
  115. logger.warn(err)
  116. def execute_command(self, command):
  117. '''
  118. Executes commands of type EXECUTION_COMMAND
  119. '''
  120. clusterName = command['clusterName']
  121. commandId = command['commandId']
  122. message = "Executing command with id = {commandId} for role = {role} of " \
  123. "cluster {cluster}.".format(
  124. commandId = str(commandId), role=command['role'],
  125. cluster=clusterName)
  126. logger.info(message)
  127. logger.debug(pprint.pformat(command))
  128. taskId = command['taskId']
  129. # Preparing 'IN_PROGRESS' report
  130. in_progress_status = self.commandStatuses.generate_report_template(command)
  131. in_progress_status.update({
  132. 'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
  133. 'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
  134. 'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
  135. 'status': self.IN_PROGRESS_STATUS
  136. })
  137. self.commandStatuses.put_command_status(command, in_progress_status)
  138. # running command
  139. commandresult = self.customServiceOrchestrator.runCommand(command,
  140. in_progress_status['tmpout'], in_progress_status['tmperr'])
  141. # dumping results
  142. status = self.COMPLETED_STATUS
  143. if commandresult['exitcode'] != 0:
  144. status = self.FAILED_STATUS
  145. roleResult = self.commandStatuses.generate_report_template(command)
  146. roleResult.update({
  147. 'stdout': commandresult['stdout'],
  148. 'stderr': commandresult['stderr'],
  149. 'exitCode': commandresult['exitcode'],
  150. 'status': status,
  151. })
  152. if roleResult['stdout'] == '':
  153. roleResult['stdout'] = 'None'
  154. if roleResult['stderr'] == '':
  155. roleResult['stderr'] = 'None'
  156. # let ambari know name of custom command
  157. if command['hostLevelParams'].has_key('custom_command'):
  158. roleResult['customCommand'] = command['hostLevelParams']['custom_command']
  159. if 'structuredOut' in commandresult:
  160. roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
  161. else:
  162. roleResult['structuredOut'] = ''
  163. # let ambari know that configuration tags were applied
  164. if status == self.COMPLETED_STATUS:
  165. configHandler = ActualConfigHandler(self.config, self.configTags)
  166. if command.has_key('configurationTags'):
  167. configHandler.write_actual(command['configurationTags'])
  168. roleResult['configurationTags'] = command['configurationTags']
  169. component = {'serviceName':command['serviceName'],'componentName':command['role']}
  170. if command.has_key('roleCommand') and \
  171. (command['roleCommand'] == self.ROLE_COMMAND_START or \
  172. (command['roleCommand'] == self.ROLE_COMMAND_INSTALL \
  173. and component in LiveStatus.CLIENT_COMPONENTS) or \
  174. (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \
  175. command['hostLevelParams'].has_key('custom_command') and \
  176. command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)):
  177. configHandler.write_actual_component(command['role'], command['configurationTags'])
  178. configHandler.write_client_components(command['serviceName'], command['configurationTags'])
  179. roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])
  180. self.commandStatuses.put_command_status(command, roleResult)
  181. def execute_status_command(self, command):
  182. '''
  183. Executes commands of type STATUS_COMMAND
  184. '''
  185. try:
  186. cluster = command['clusterName']
  187. service = command['serviceName']
  188. component = command['componentName']
  189. configurations = command['configurations']
  190. if configurations.has_key('global'):
  191. globalConfig = configurations['global']
  192. else:
  193. globalConfig = {}
  194. livestatus = LiveStatus(cluster, service, component,
  195. globalConfig, self.config, self.configTags)
  196. component_extra = None
  197. # For custom services, responsibility to determine service status is
  198. # delegated to python scripts
  199. component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
  200. if component_status_result['exitcode'] == 0:
  201. component_status = LiveStatus.LIVE_STATUS
  202. else:
  203. component_status = LiveStatus.DEAD_STATUS
  204. if component_status_result.has_key('structuredOut'):
  205. component_extra = component_status_result['structuredOut']
  206. result = livestatus.build(forsed_component_status= component_status)
  207. if component_extra is not None and len(component_extra) != 0:
  208. result['extra'] = component_extra
  209. logger.debug("Got live status for component " + component + \
  210. " of service " + str(service) + \
  211. " of cluster " + str(cluster))
  212. logger.debug(pprint.pformat(result))
  213. if result is not None:
  214. self.commandStatuses.put_command_status(command, result)
  215. except Exception, err:
  216. traceback.print_exc()
  217. logger.warn(err)
  218. pass
  219. # Store action result to agent response queue
  220. def result(self):
  221. return self.commandStatuses.generate_report()
  222. def status_update_callback(self):
  223. """
  224. Actions that are executed every time when command status changes
  225. """
  226. self.controller.heartbeat_wait_event.set()