CustomServiceOrchestrator.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging
  18. import os
  19. import json
  20. import sys
  21. from ambari_commons import shell
  22. import threading
  23. from FileCache import FileCache
  24. from AgentException import AgentException
  25. from PythonExecutor import PythonExecutor
  26. import hostname
  27. logger = logging.getLogger()
  28. class CustomServiceOrchestrator():
  29. """
  30. Executes a command for custom service. stdout and stderr are written to
  31. tmpoutfile and to tmperrfile respectively.
  32. """
  33. SCRIPT_TYPE_PYTHON = "PYTHON"
  34. COMMAND_NAME_STATUS = "STATUS"
  35. COMMAND_NAME_SECURITY_STATUS = "SECURITY_STATUS"
  36. CUSTOM_ACTION_COMMAND = 'ACTIONEXECUTE'
  37. CUSTOM_COMMAND_COMMAND = 'CUSTOM_COMMAND'
  38. PRE_HOOK_PREFIX="before"
  39. POST_HOOK_PREFIX="after"
  40. HOSTS_LIST_KEY = "all_hosts"
  41. PING_PORTS_KEY = "all_ping_ports"
  42. RACKS_KEY = "all_racks"
  43. IPV4_ADDRESSES_KEY = "all_ipv4_ips"
  44. AMBARI_SERVER_HOST = "ambari_server_host"
  45. DONT_DEBUG_FAILURES_FOR_COMMANDS = [COMMAND_NAME_SECURITY_STATUS, COMMAND_NAME_STATUS]
  46. def __init__(self, config, controller):
  47. self.config = config
  48. self.tmp_dir = config.get('agent', 'prefix')
  49. self.exec_tmp_dir = config.get('agent', 'tmp_dir')
  50. self.file_cache = FileCache(config)
  51. self.python_executor = PythonExecutor(self.tmp_dir, config)
  52. self.status_commands_stdout = os.path.join(self.tmp_dir,
  53. 'status_command_stdout.txt')
  54. self.status_commands_stderr = os.path.join(self.tmp_dir,
  55. 'status_command_stderr.txt')
  56. self.public_fqdn = hostname.public_hostname(config)
  57. # cache reset will be called on every agent registration
  58. controller.registration_listeners.append(self.file_cache.reset)
  59. # Clean up old status command files if any
  60. try:
  61. os.unlink(self.status_commands_stdout)
  62. os.unlink(self.status_commands_stderr)
  63. except OSError:
  64. pass # Ignore fail
  65. self.commands_in_progress_lock = threading.RLock()
  66. self.commands_in_progress = {}
  67. def map_task_to_process(self, task_id, processId):
  68. with self.commands_in_progress_lock:
  69. logger.debug('Maps taskId=%s to pid=%s' % (task_id, processId))
  70. self.commands_in_progress[task_id] = processId
  71. def cancel_command(self, task_id, reason):
  72. with self.commands_in_progress_lock:
  73. if task_id in self.commands_in_progress.keys():
  74. pid = self.commands_in_progress.get(task_id)
  75. self.commands_in_progress[task_id] = reason
  76. logger.info("Canceling command with task_id - {tid}, " \
  77. "reason - {reason} . Killing process {pid}"
  78. .format(tid=str(task_id), reason=reason, pid=pid))
  79. shell.kill_process_with_children(pid)
  80. else:
  81. logger.warn("Unable to find pid by taskId = %s" % task_id)
  82. def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
  83. override_output_files = True):
  84. """
  85. forced_command_name may be specified manually. In this case, value, defined at
  86. command json, is ignored.
  87. """
  88. try:
  89. script_type = command['commandParams']['script_type']
  90. script = command['commandParams']['script']
  91. timeout = int(command['commandParams']['command_timeout'])
  92. if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
  93. server_url_prefix = command['hostLevelParams']['jdk_location']
  94. else:
  95. server_url_prefix = command['commandParams']['jdk_location']
  96. task_id = "status"
  97. try:
  98. task_id = command['taskId']
  99. command_name = command['roleCommand']
  100. except KeyError:
  101. pass # Status commands have no taskId
  102. if forced_command_name is not None: # If not supplied as an argument
  103. command_name = forced_command_name
  104. if command_name == self.CUSTOM_ACTION_COMMAND:
  105. base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
  106. script_tuple = (os.path.join(base_dir, 'scripts', script), base_dir)
  107. hook_dir = None
  108. else:
  109. if command_name == self.CUSTOM_COMMAND_COMMAND:
  110. command_name = command['hostLevelParams']['custom_command']
  111. # forces a hash challenge on the directories to keep them updated, even
  112. # if the return type is not used
  113. self.file_cache.get_host_scripts_base_dir(server_url_prefix)
  114. hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix)
  115. base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix)
  116. script_path = self.resolve_script_path(base_dir, script)
  117. script_tuple = (script_path, base_dir)
  118. tmpstrucoutfile = os.path.join(self.tmp_dir,
  119. "structured-out-{0}.json".format(task_id))
  120. # We don't support anything else yet
  121. if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
  122. message = "Unknown script type {0}".format(script_type)
  123. raise AgentException(message)
  124. # Execute command using proper interpreter
  125. handle = None
  126. if command.has_key('__handle'):
  127. handle = command['__handle']
  128. handle.on_background_command_started = self.map_task_to_process
  129. del command['__handle']
  130. json_path = self.dump_command_to_json(command)
  131. pre_hook_tuple = self.resolve_hook_script_path(hook_dir,
  132. self.PRE_HOOK_PREFIX, command_name, script_type)
  133. post_hook_tuple = self.resolve_hook_script_path(hook_dir,
  134. self.POST_HOOK_PREFIX, command_name, script_type)
  135. py_file_list = [pre_hook_tuple, script_tuple, post_hook_tuple]
  136. # filter None values
  137. filtered_py_file_list = [i for i in py_file_list if i]
  138. logger_level = logging.getLevelName(logger.level)
  139. # Executing hooks and script
  140. ret = None
  141. from ActionQueue import ActionQueue
  142. if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
  143. raise AgentException("Background commands are supported without hooks only")
  144. for py_file, current_base_dir in filtered_py_file_list:
  145. log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
  146. script_params = [command_name, json_path, current_base_dir]
  147. ret = self.python_executor.run_file(py_file, script_params,
  148. self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
  149. tmpstrucoutfile, logger_level, self.map_task_to_process,
  150. task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure)
  151. # Next run_file() invocations should always append to current output
  152. override_output_files = False
  153. if ret['exitcode'] != 0:
  154. break
  155. if not ret: # Something went wrong
  156. raise AgentException("No script has been executed")
  157. # if canceled and not background command
  158. if handle is None:
  159. cancel_reason = self.command_canceled_reason(task_id)
  160. if cancel_reason:
  161. ret['stdout'] += cancel_reason
  162. ret['stderr'] += cancel_reason
  163. with open(tmpoutfile, "a") as f:
  164. f.write(cancel_reason)
  165. with open(tmperrfile, "a") as f:
  166. f.write(cancel_reason)
  167. except Exception, e: # We do not want to let agent fail completely
  168. exc_type, exc_obj, exc_tb = sys.exc_info()
  169. message = "Caught an exception while executing "\
  170. "custom service command: {0}: {1}; {2}".format(exc_type, exc_obj, str(e))
  171. logger.exception(message)
  172. ret = {
  173. 'stdout' : message,
  174. 'stderr' : message,
  175. 'structuredOut' : '{}',
  176. 'exitcode': 1,
  177. }
  178. return ret
  179. def command_canceled_reason(self, task_id):
  180. with self.commands_in_progress_lock:
  181. if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
  182. logger.debug('Pop with taskId %s' % task_id)
  183. pid = self.commands_in_progress.pop(task_id)
  184. if not isinstance(pid, int):
  185. return '\nCommand aborted. ' + pid
  186. return None
  187. def requestComponentStatus(self, command):
  188. """
  189. Component status is determined by exit code, returned by runCommand().
  190. Exit code 0 means that component is running and any other exit code means that
  191. component is not running
  192. """
  193. override_output_files=True # by default, we override status command output
  194. if logger.level == logging.DEBUG:
  195. override_output_files = False
  196. res = self.runCommand(command, self.status_commands_stdout,
  197. self.status_commands_stderr, self.COMMAND_NAME_STATUS,
  198. override_output_files=override_output_files)
  199. return res
  200. def requestComponentSecurityState(self, command):
  201. """
  202. Determines the current security state of the component
  203. A command will be issued to trigger the security_status check and the result of this check will
  204. returned to the caller. If the component lifecycle script has no security_status method the
  205. check will return non zero exit code and "UNKNOWN" will be returned.
  206. """
  207. override_output_files=True # by default, we override status command output
  208. if logger.level == logging.DEBUG:
  209. override_output_files = False
  210. security_check_res = self.runCommand(command, self.status_commands_stdout,
  211. self.status_commands_stderr, self.COMMAND_NAME_SECURITY_STATUS,
  212. override_output_files=override_output_files)
  213. result = 'UNKNOWN'
  214. if security_check_res is None:
  215. logger.warn("The return value of the security_status check was empty, the security status is unknown")
  216. elif 'exitcode' not in security_check_res:
  217. logger.warn("Missing 'exitcode' value from the security_status check result, the security status is unknown")
  218. elif security_check_res['exitcode'] != 0:
  219. logger.debug("The 'exitcode' value from the security_status check result indicated the check routine failed to properly execute, the security status is unknown")
  220. elif 'structuredOut' not in security_check_res:
  221. logger.warn("Missing 'structuredOut' value from the security_status check result, the security status is unknown")
  222. elif 'securityState' not in security_check_res['structuredOut']:
  223. logger.warn("Missing 'securityState' value from the security_status check structuredOut data set, the security status is unknown")
  224. else:
  225. result = security_check_res['structuredOut']['securityState']
  226. return result
  227. def resolve_script_path(self, base_dir, script):
  228. """
  229. Encapsulates logic of script location determination.
  230. """
  231. path = os.path.join(base_dir, script)
  232. if not os.path.exists(path):
  233. message = "Script {0} does not exist".format(path)
  234. raise AgentException(message)
  235. return path
  236. def resolve_hook_script_path(self, stack_hooks_dir, prefix, command_name, script_type):
  237. """
  238. Returns a tuple(path to hook script, hook base dir) according to string prefix
  239. and command name. If script does not exist, returns None
  240. """
  241. if not stack_hooks_dir:
  242. return None
  243. hook_dir = "{0}-{1}".format(prefix, command_name)
  244. hook_base_dir = os.path.join(stack_hooks_dir, hook_dir)
  245. hook_script_path = os.path.join(hook_base_dir, "scripts", "hook.py")
  246. if not os.path.isfile(hook_script_path):
  247. logger.debug("Hook script {0} not found, skipping".format(hook_script_path))
  248. return None
  249. return hook_script_path, hook_base_dir
  250. def dump_command_to_json(self, command):
  251. """
  252. Converts command to json file and returns file path
  253. """
  254. # Perform few modifications to stay compatible with the way in which
  255. public_fqdn = self.public_fqdn
  256. command['public_hostname'] = public_fqdn
  257. # Add cache dir to make it visible for commands
  258. command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir')
  259. # Now, dump the json file
  260. command_type = command['commandType']
  261. from ActionQueue import ActionQueue # To avoid cyclic dependency
  262. if command_type == ActionQueue.STATUS_COMMAND:
  263. # These files are frequently created, that's why we don't
  264. # store them all, but only the latest one
  265. file_path = os.path.join(self.tmp_dir, "status_command.json")
  266. else:
  267. task_id = command['taskId']
  268. if 'clusterHostInfo' in command and command['clusterHostInfo']:
  269. command['clusterHostInfo'] = self.decompressClusterHostInfo(command['clusterHostInfo'])
  270. file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))
  271. if command_type == ActionQueue.AUTO_EXECUTION_COMMAND:
  272. file_path = os.path.join(self.tmp_dir, "auto_command-{0}.json".format(task_id))
  273. # Json may contain passwords, that's why we need proper permissions
  274. if os.path.isfile(file_path):
  275. os.unlink(file_path)
  276. with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT,
  277. 0600), 'w') as f:
  278. content = json.dumps(command, sort_keys = False, indent = 4)
  279. f.write(content)
  280. return file_path
  281. def decompressClusterHostInfo(self, clusterHostInfo):
  282. info = clusterHostInfo.copy()
  283. #Pop info not related to host roles
  284. hostsList = info.pop(self.HOSTS_LIST_KEY)
  285. pingPorts = info.pop(self.PING_PORTS_KEY)
  286. racks = info.pop(self.RACKS_KEY)
  287. ipv4_addresses = info.pop(self.IPV4_ADDRESSES_KEY)
  288. ambariServerHost = info.pop(self.AMBARI_SERVER_HOST)
  289. decompressedMap = {}
  290. for k,v in info.items():
  291. # Convert from 1-3,5,6-8 to [1,2,3,5,6,7,8]
  292. indexes = self.convertRangeToList(v)
  293. # Convert from [1,2,3,5,6,7,8] to [host1,host2,host3...]
  294. decompressedMap[k] = [hostsList[i] for i in indexes]
  295. #Convert from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
  296. pingPorts = self.convertMappedRangeToList(pingPorts)
  297. racks = self.convertMappedRangeToList(racks)
  298. ipv4_addresses = self.convertMappedRangeToList(ipv4_addresses)
  299. #Convert all elements to str
  300. pingPorts = map(str, pingPorts)
  301. #Add ping ports to result
  302. decompressedMap[self.PING_PORTS_KEY] = pingPorts
  303. #Add hosts list to result
  304. decompressedMap[self.HOSTS_LIST_KEY] = hostsList
  305. #Add racks list to result
  306. decompressedMap[self.RACKS_KEY] = racks
  307. #Add ips list to result
  308. decompressedMap[self.IPV4_ADDRESSES_KEY] = ipv4_addresses
  309. #Add ambari-server host to result
  310. decompressedMap[self.AMBARI_SERVER_HOST] = ambariServerHost
  311. return decompressedMap
  312. # Converts from 1-3,5,6-8 to [1,2,3,5,6,7,8]
  313. def convertRangeToList(self, list):
  314. resultList = []
  315. for i in list:
  316. ranges = i.split(',')
  317. for r in ranges:
  318. rangeBounds = r.split('-')
  319. if len(rangeBounds) == 2:
  320. if not rangeBounds[0] or not rangeBounds[1]:
  321. raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
  322. resultList.extend(range(int(rangeBounds[0]), int(rangeBounds[1]) + 1))
  323. elif len(rangeBounds) == 1:
  324. resultList.append((int(rangeBounds[0])))
  325. else:
  326. raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
  327. return resultList
  328. #Converts from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
  329. def convertMappedRangeToList(self, list):
  330. resultDict = {}
  331. for i in list:
  332. valueToRanges = i.split(":")
  333. if len(valueToRanges) <> 2:
  334. raise AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(i))
  335. value = valueToRanges[0]
  336. rangesToken = valueToRanges[1]
  337. for r in rangesToken.split(','):
  338. rangeIndexes = r.split('-')
  339. if len(rangeIndexes) == 2:
  340. if not rangeIndexes[0] or not rangeIndexes[1]:
  341. raise AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(r))
  342. start = int(rangeIndexes[0])
  343. end = int(rangeIndexes[1])
  344. for k in range(start, end + 1):
  345. resultDict[k] = value if not value.isdigit() else int(value)
  346. elif len(rangeIndexes) == 1:
  347. index = int(rangeIndexes[0])
  348. resultDict[index] = value if not value.isdigit() else int(value)
  349. resultList = dict(sorted(resultDict.items())).values()
  350. return resultList