CustomServiceOrchestrator.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging
  18. import os
  19. import ambari_simplejson as json
  20. import sys
  21. from ambari_commons import shell
  22. import threading
  23. from FileCache import FileCache
  24. from AgentException import AgentException
  25. from PythonExecutor import PythonExecutor
  26. from PythonReflectiveExecutor import PythonReflectiveExecutor
  27. from resource_management.libraries.functions.log_process_information import log_process_information
  28. from resource_management.core.utils import PasswordString
  29. import subprocess
  30. import Constants
  31. import hostname
  32. logger = logging.getLogger()
  33. class CustomServiceOrchestrator():
  34. """
  35. Executes a command for custom service. stdout and stderr are written to
  36. tmpoutfile and to tmperrfile respectively.
  37. """
  38. SCRIPT_TYPE_PYTHON = "PYTHON"
  39. COMMAND_TYPE = "commandType"
  40. COMMAND_NAME_STATUS = "STATUS"
  41. CUSTOM_ACTION_COMMAND = 'ACTIONEXECUTE'
  42. CUSTOM_COMMAND_COMMAND = 'CUSTOM_COMMAND'
  43. PRE_HOOK_PREFIX="before"
  44. POST_HOOK_PREFIX="after"
  45. HOSTS_LIST_KEY = "all_hosts"
  46. PING_PORTS_KEY = "all_ping_ports"
  47. RACKS_KEY = "all_racks"
  48. IPV4_ADDRESSES_KEY = "all_ipv4_ips"
  49. AMBARI_SERVER_HOST = "ambari_server_host"
  50. AMBARI_SERVER_PORT = "ambari_server_port"
  51. AMBARI_SERVER_USE_SSL = "ambari_server_use_ssl"
  52. FREQUENT_COMMANDS = [COMMAND_NAME_STATUS]
  53. DONT_DEBUG_FAILURES_FOR_COMMANDS = FREQUENT_COMMANDS
  54. REFLECTIVELY_RUN_COMMANDS = FREQUENT_COMMANDS # -- commands which run a lot and often (this increases their speed)
  55. DONT_BACKUP_LOGS_FOR_COMMANDS = FREQUENT_COMMANDS
  56. # Path where hadoop credential JARS will be available
  57. DEFAULT_CREDENTIAL_SHELL_LIB_PATH = '/var/lib/ambari-agent/cred/lib'
  58. DEFAULT_CREDENTIAL_CONF_DIR = '/var/lib/ambari-agent/cred/conf'
  59. DEFAULT_CREDENTIAL_SHELL_CMD = 'org.apache.hadoop.security.alias.CredentialShell'
  60. # The property name used by the hadoop credential provider
  61. CREDENTIAL_PROVIDER_PROPERTY_NAME = 'hadoop.security.credential.provider.path'
  62. # Property name for credential store class path
  63. CREDENTIAL_STORE_CLASS_PATH_NAME = 'credentialStoreClassPath'
  64. def __init__(self, config, controller):
  65. self.config = config
  66. self.tmp_dir = config.get('agent', 'prefix')
  67. self.force_https_protocol = config.get_force_https_protocol()
  68. self.exec_tmp_dir = Constants.AGENT_TMP_DIR
  69. self.file_cache = FileCache(config)
  70. self.status_commands_stdout = os.path.join(self.tmp_dir,
  71. 'status_command_stdout.txt')
  72. self.status_commands_stderr = os.path.join(self.tmp_dir,
  73. 'status_command_stderr.txt')
  74. self.public_fqdn = hostname.public_hostname(config)
  75. # cache reset will be called on every agent registration
  76. controller.registration_listeners.append(self.file_cache.reset)
  77. # Construct the hadoop credential lib JARs path
  78. self.credential_shell_lib_path = os.path.join(config.get('security', 'credential_lib_dir',
  79. self.DEFAULT_CREDENTIAL_SHELL_LIB_PATH), '*')
  80. self.credential_conf_dir = config.get('security', 'credential_conf_dir', self.DEFAULT_CREDENTIAL_CONF_DIR)
  81. self.credential_shell_cmd = config.get('security', 'credential_shell_cmd', self.DEFAULT_CREDENTIAL_SHELL_CMD)
  82. # Clean up old status command files if any
  83. try:
  84. os.unlink(self.status_commands_stdout)
  85. os.unlink(self.status_commands_stderr)
  86. except OSError:
  87. pass # Ignore fail
  88. self.commands_in_progress_lock = threading.RLock()
  89. self.commands_in_progress = {}
  90. def map_task_to_process(self, task_id, processId):
  91. with self.commands_in_progress_lock:
  92. logger.debug('Maps taskId=%s to pid=%s' % (task_id, processId))
  93. self.commands_in_progress[task_id] = processId
  94. def cancel_command(self, task_id, reason):
  95. with self.commands_in_progress_lock:
  96. if task_id in self.commands_in_progress.keys():
  97. pid = self.commands_in_progress.get(task_id)
  98. self.commands_in_progress[task_id] = reason
  99. logger.info("Canceling command with taskId = {tid}, " \
  100. "reason - {reason} . Killing process {pid}"
  101. .format(tid=str(task_id), reason=reason, pid=pid))
  102. log_process_information(logger)
  103. shell.kill_process_with_children(pid)
  104. else:
  105. logger.warn("Unable to find process associated with taskId = %s" % task_id)
  106. def get_py_executor(self, forced_command_name):
  107. """
  108. Wrapper for unit testing
  109. :return:
  110. """
  111. if forced_command_name in self.REFLECTIVELY_RUN_COMMANDS:
  112. return PythonReflectiveExecutor(self.tmp_dir, self.config)
  113. else:
  114. return PythonExecutor(self.tmp_dir, self.config)
  115. def getProviderDirectory(self, service_name):
  116. """
  117. Gets the path to the service conf folder where the JCEKS file will be created.
  118. :param service_name: Name of the service, for example, HIVE
  119. :return: lower case path to the service conf folder
  120. """
  121. # The stack definition scripts of the service can move the
  122. # JCEKS file around to where it wants, which is usually
  123. # /etc/<service_name>/conf
  124. conf_dir = os.path.join(self.credential_conf_dir, service_name.lower())
  125. return conf_dir
  126. def getConfigTypeCredentials(self, commandJson):
  127. """
  128. Gets the affected config types for the service in this command
  129. with the password aliases and values.
  130. Input:
  131. {
  132. "config-type1" : {
  133. "password_key_name1":"password_value_name1",
  134. "password_key_name2":"password_value_name2",
  135. :
  136. },
  137. "config-type2" : {
  138. "password_key_name1":"password_value_name1",
  139. "password_key_name2":"password_value_name2",
  140. :
  141. },
  142. :
  143. }
  144. Output:
  145. {
  146. "config-type1" : {
  147. "alias1":"password1",
  148. "alias2":"password2",
  149. :
  150. },
  151. "config-type2" : {
  152. "alias1":"password1",
  153. "alias2":"password2",
  154. :
  155. },
  156. :
  157. }
  158. If password_key_name is the same as password_value_name, then password_key_name is the password alias itself.
  159. The value it points to is the password value.
  160. If password_key_name is not the same as the password_value_name, then password_key_name points to the alias.
  161. The value is pointed to by password_value_name.
  162. For example:
  163. Input:
  164. {
  165. "oozie-site" : {"oozie.service.JPAService.jdbc.password" : "oozie.service.JPAService.jdbc.password"},
  166. "admin-properties" {"db_user":"db_password", "ranger.jpa.jdbc.credential.alias:ranger-admin-site" : "db_password"}
  167. }
  168. Output:
  169. {
  170. "oozie-site" : {"oozie.service.JPAService.jdbc.password" : "MyOozieJdbcPassword"},
  171. "admin-properties" {"rangerdba" : "MyRangerDbaPassword", "rangeradmin":"MyRangerDbaPassword"},
  172. }
  173. :param commandJson:
  174. :return:
  175. """
  176. configtype_credentials = {}
  177. if 'configuration_credentials' in commandJson:
  178. for config_type, password_properties in commandJson['configuration_credentials'].items():
  179. if config_type in commandJson['configurations']:
  180. value_names = []
  181. config = commandJson['configurations'][config_type]
  182. credentials = {}
  183. for key_name, value_name in password_properties.items():
  184. if key_name == value_name:
  185. if value_name in config:
  186. # password name is the alias
  187. credentials[key_name] = config[value_name]
  188. value_names.append(value_name) # Gather the value_name for deletion
  189. else:
  190. keyname_keyconfig = key_name.split(':')
  191. key_name = keyname_keyconfig[0]
  192. # if the key is in another configuration (cross reference),
  193. # get the value of the key from that configuration
  194. if (len(keyname_keyconfig) > 1):
  195. if keyname_keyconfig[1] not in commandJson['configurations']:
  196. continue
  197. key_config = commandJson['configurations'][keyname_keyconfig[1]]
  198. else:
  199. key_config = config
  200. if key_name in key_config and value_name in config:
  201. # password name points to the alias
  202. credentials[key_config[key_name]] = config[value_name]
  203. value_names.append(value_name) # Gather the value_name for deletion
  204. if len(credentials) > 0:
  205. configtype_credentials[config_type] = credentials
  206. logger.info("Identifying config {0} for CS: ".format(config_type))
  207. for value_name in value_names:
  208. # Remove the clear text password
  209. config.pop(value_name, None)
  210. return configtype_credentials
  211. def generateJceks(self, commandJson):
  212. """
  213. Generates the JCEKS file with passwords for the service specified in commandJson
  214. :param commandJson: command JSON
  215. :return: An exit value from the external process that generated the JCEKS file. None if
  216. there are no passwords in the JSON.
  217. """
  218. cmd_result = None
  219. roleCommand = None
  220. if 'roleCommand' in commandJson:
  221. roleCommand = commandJson['roleCommand']
  222. task_id = None
  223. if 'taskId' in commandJson:
  224. task_id = commandJson['taskId']
  225. logger.info('Generating the JCEKS file: roleCommand={0} and taskId = {1}'.format(roleCommand, task_id))
  226. # Set up the variables for the external command to generate a JCEKS file
  227. java_home = commandJson['hostLevelParams']['java_home']
  228. java_bin = '{java_home}/bin/java'.format(java_home=java_home)
  229. cs_lib_path = self.credential_shell_lib_path
  230. serviceName = commandJson['serviceName']
  231. # Gather the password values and remove them from the configuration
  232. configtype_credentials = self.getConfigTypeCredentials(commandJson)
  233. # CS is enabled but no config property is available for this command
  234. if len(configtype_credentials) == 0:
  235. logger.info("Credential store is enabled but no property are found that can be encrypted.")
  236. commandJson['credentialStoreEnabled'] = "false"
  237. for config_type, credentials in configtype_credentials.items():
  238. config = commandJson['configurations'][config_type]
  239. file_path = os.path.join(self.getProviderDirectory(serviceName), "{0}.jceks".format(config_type))
  240. if os.path.exists(file_path):
  241. os.remove(file_path)
  242. provider_path = 'jceks://file{file_path}'.format(file_path=file_path)
  243. logger.info('provider_path={0}'.format(provider_path))
  244. for alias, pwd in credentials.items():
  245. logger.debug("config={0}".format(config))
  246. protected_pwd = PasswordString(pwd)
  247. # Generate the JCEKS file
  248. cmd = (java_bin, '-cp', cs_lib_path, self.credential_shell_cmd, 'create',
  249. alias, '-value', protected_pwd, '-provider', provider_path)
  250. logger.info(cmd)
  251. cmd_result = subprocess.call(cmd)
  252. logger.info('cmd_result = {0}'.format(cmd_result))
  253. os.chmod(file_path, 0644) # group and others should have read access so that the service user can read
  254. # Add JCEKS provider path instead
  255. config[self.CREDENTIAL_PROVIDER_PROPERTY_NAME] = provider_path
  256. config[self.CREDENTIAL_STORE_CLASS_PATH_NAME] = cs_lib_path
  257. return cmd_result
  258. def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
  259. override_output_files=True, retry=False):
  260. """
  261. forced_command_name may be specified manually. In this case, value, defined at
  262. command json, is ignored.
  263. """
  264. try:
  265. script_type = command['commandParams']['script_type']
  266. script = command['commandParams']['script']
  267. timeout = int(command['commandParams']['command_timeout'])
  268. if 'hostLevelParams' in command and 'jdk_location' in command['hostLevelParams']:
  269. server_url_prefix = command['hostLevelParams']['jdk_location']
  270. else:
  271. server_url_prefix = command['commandParams']['jdk_location']
  272. task_id = "status"
  273. try:
  274. task_id = command['taskId']
  275. command_name = command['roleCommand']
  276. except KeyError:
  277. pass # Status commands have no taskId
  278. if forced_command_name is not None: # If not supplied as an argument
  279. command_name = forced_command_name
  280. if command_name == self.CUSTOM_ACTION_COMMAND:
  281. base_dir = self.file_cache.get_custom_actions_base_dir(server_url_prefix)
  282. script_tuple = (os.path.join(base_dir, 'scripts', script), base_dir)
  283. hook_dir = None
  284. else:
  285. if command_name == self.CUSTOM_COMMAND_COMMAND:
  286. command_name = command['hostLevelParams']['custom_command']
  287. # forces a hash challenge on the directories to keep them updated, even
  288. # if the return type is not used
  289. self.file_cache.get_host_scripts_base_dir(server_url_prefix)
  290. hook_dir = self.file_cache.get_hook_base_dir(command, server_url_prefix)
  291. base_dir = self.file_cache.get_service_base_dir(command, server_url_prefix)
  292. self.file_cache.get_custom_resources_subdir(command, server_url_prefix)
  293. script_path = self.resolve_script_path(base_dir, script)
  294. script_tuple = (script_path, base_dir)
  295. tmpstrucoutfile = os.path.join(self.tmp_dir,
  296. "structured-out-{0}.json".format(task_id))
  297. # We don't support anything else yet
  298. if script_type.upper() != self.SCRIPT_TYPE_PYTHON:
  299. message = "Unknown script type {0}".format(script_type)
  300. raise AgentException(message)
  301. # Execute command using proper interpreter
  302. handle = None
  303. if command.has_key('__handle'):
  304. handle = command['__handle']
  305. handle.on_background_command_started = self.map_task_to_process
  306. del command['__handle']
  307. # If command contains credentialStoreEnabled, then
  308. # generate the JCEKS file for the configurations.
  309. credentialStoreEnabled = False
  310. if 'credentialStoreEnabled' in command:
  311. credentialStoreEnabled = (command['credentialStoreEnabled'] == "true")
  312. if credentialStoreEnabled == True:
  313. if 'commandBeingRetried' not in command or command['commandBeingRetried'] != "true":
  314. self.generateJceks(command)
  315. else:
  316. logger.info("Skipping generation of jceks files as this is a retry of the command")
  317. json_path = self.dump_command_to_json(command, retry)
  318. pre_hook_tuple = self.resolve_hook_script_path(hook_dir,
  319. self.PRE_HOOK_PREFIX, command_name, script_type)
  320. post_hook_tuple = self.resolve_hook_script_path(hook_dir,
  321. self.POST_HOOK_PREFIX, command_name, script_type)
  322. py_file_list = [pre_hook_tuple, script_tuple, post_hook_tuple]
  323. # filter None values
  324. filtered_py_file_list = [i for i in py_file_list if i]
  325. logger_level = logging.getLevelName(logger.level)
  326. # Executing hooks and script
  327. ret = None
  328. from ActionQueue import ActionQueue
  329. if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
  330. raise AgentException("Background commands are supported without hooks only")
  331. python_executor = self.get_py_executor(forced_command_name)
  332. backup_log_files = not command_name in self.DONT_BACKUP_LOGS_FOR_COMMANDS
  333. log_out_files = self.config.get("logging","log_out_files", default="0") != "0"
  334. for py_file, current_base_dir in filtered_py_file_list:
  335. log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
  336. script_params = [command_name, json_path, current_base_dir, tmpstrucoutfile, logger_level, self.exec_tmp_dir,
  337. self.force_https_protocol]
  338. if log_out_files:
  339. script_params.append("-o")
  340. ret = python_executor.run_file(py_file, script_params,
  341. tmpoutfile, tmperrfile, timeout,
  342. tmpstrucoutfile, self.map_task_to_process,
  343. task_id, override_output_files, backup_log_files = backup_log_files,
  344. handle = handle, log_info_on_failure=log_info_on_failure)
  345. # Next run_file() invocations should always append to current output
  346. override_output_files = False
  347. if ret['exitcode'] != 0:
  348. break
  349. if not ret: # Something went wrong
  350. raise AgentException("No script has been executed")
  351. # if canceled and not background command
  352. if handle is None:
  353. cancel_reason = self.command_canceled_reason(task_id)
  354. if cancel_reason is not None:
  355. ret['stdout'] += cancel_reason
  356. ret['stderr'] += cancel_reason
  357. with open(tmpoutfile, "a") as f:
  358. f.write(cancel_reason)
  359. with open(tmperrfile, "a") as f:
  360. f.write(cancel_reason)
  361. except Exception, e: # We do not want to let agent fail completely
  362. exc_type, exc_obj, exc_tb = sys.exc_info()
  363. message = "Caught an exception while executing "\
  364. "custom service command: {0}: {1}; {2}".format(exc_type, exc_obj, str(e))
  365. logger.exception(message)
  366. ret = {
  367. 'stdout' : message,
  368. 'stderr' : message,
  369. 'structuredOut' : '{}',
  370. 'exitcode': 1,
  371. }
  372. return ret
  373. def command_canceled_reason(self, task_id):
  374. with self.commands_in_progress_lock:
  375. if self.commands_in_progress.has_key(task_id):#Background command do not push in this collection (TODO)
  376. logger.debug('Pop with taskId %s' % task_id)
  377. pid = self.commands_in_progress.pop(task_id)
  378. if not isinstance(pid, int):
  379. reason = pid
  380. if reason:
  381. return "\nCommand aborted. Reason: '{0}'".format(reason)
  382. else:
  383. return "\nCommand aborted."
  384. return None
  385. def requestComponentStatus(self, command):
  386. """
  387. Component status is determined by exit code, returned by runCommand().
  388. Exit code 0 means that component is running and any other exit code means that
  389. component is not running
  390. """
  391. override_output_files=True # by default, we override status command output
  392. if logger.level == logging.DEBUG:
  393. override_output_files = False
  394. res = self.runCommand(command, self.status_commands_stdout,
  395. self.status_commands_stderr, self.COMMAND_NAME_STATUS,
  396. override_output_files=override_output_files)
  397. return res
  398. def resolve_script_path(self, base_dir, script):
  399. """
  400. Encapsulates logic of script location determination.
  401. """
  402. path = os.path.join(base_dir, script)
  403. if not os.path.exists(path):
  404. message = "Script {0} does not exist".format(path)
  405. raise AgentException(message)
  406. return path
  407. def resolve_hook_script_path(self, stack_hooks_dir, prefix, command_name, script_type):
  408. """
  409. Returns a tuple(path to hook script, hook base dir) according to string prefix
  410. and command name. If script does not exist, returns None
  411. """
  412. if not stack_hooks_dir:
  413. return None
  414. hook_dir = "{0}-{1}".format(prefix, command_name)
  415. hook_base_dir = os.path.join(stack_hooks_dir, hook_dir)
  416. hook_script_path = os.path.join(hook_base_dir, "scripts", "hook.py")
  417. if not os.path.isfile(hook_script_path):
  418. logger.debug("Hook script {0} not found, skipping".format(hook_script_path))
  419. return None
  420. return hook_script_path, hook_base_dir
  421. def dump_command_to_json(self, command, retry=False):
  422. """
  423. Converts command to json file and returns file path
  424. """
  425. # Perform few modifications to stay compatible with the way in which
  426. public_fqdn = self.public_fqdn
  427. command['public_hostname'] = public_fqdn
  428. # Add cache dir to make it visible for commands
  429. command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir')
  430. command["agentConfigParams"] = {
  431. "agent": {
  432. "parallel_execution": self.config.get_parallel_exec_option(),
  433. "use_system_proxy_settings": self.config.use_system_proxy_setting()
  434. }
  435. }
  436. # Now, dump the json file
  437. command_type = command['commandType']
  438. from ActionQueue import ActionQueue # To avoid cyclic dependency
  439. if command_type == ActionQueue.STATUS_COMMAND:
  440. # These files are frequently created, that's why we don't
  441. # store them all, but only the latest one
  442. file_path = os.path.join(self.tmp_dir, "status_command.json")
  443. else:
  444. task_id = command['taskId']
  445. if 'clusterHostInfo' in command and command['clusterHostInfo'] and not retry:
  446. command['clusterHostInfo'] = self.decompressClusterHostInfo(command['clusterHostInfo'])
  447. file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))
  448. if command_type == ActionQueue.AUTO_EXECUTION_COMMAND:
  449. file_path = os.path.join(self.tmp_dir, "auto_command-{0}.json".format(task_id))
  450. # Json may contain passwords, that's why we need proper permissions
  451. if os.path.isfile(file_path):
  452. os.unlink(file_path)
  453. with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT,
  454. 0600), 'w') as f:
  455. content = json.dumps(command, sort_keys = False, indent = 4)
  456. f.write(content)
  457. return file_path
  458. def decompressClusterHostInfo(self, clusterHostInfo):
  459. info = clusterHostInfo.copy()
  460. #Pop info not related to host roles
  461. hostsList = info.pop(self.HOSTS_LIST_KEY)
  462. pingPorts = info.pop(self.PING_PORTS_KEY)
  463. racks = info.pop(self.RACKS_KEY)
  464. ipv4_addresses = info.pop(self.IPV4_ADDRESSES_KEY)
  465. ambariServerHost = info.pop(self.AMBARI_SERVER_HOST)
  466. ambariServerPort = info.pop(self.AMBARI_SERVER_PORT)
  467. ambariServerUseSsl = info.pop(self.AMBARI_SERVER_USE_SSL)
  468. decompressedMap = {}
  469. for k,v in info.items():
  470. # Convert from 1-3,5,6-8 to [1,2,3,5,6,7,8]
  471. indexes = self.convertRangeToList(v)
  472. # Convert from [1,2,3,5,6,7,8] to [host1,host2,host3...]
  473. decompressedMap[k] = [hostsList[i] for i in indexes]
  474. #Convert from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
  475. pingPorts = self.convertMappedRangeToList(pingPorts)
  476. racks = self.convertMappedRangeToList(racks)
  477. ipv4_addresses = self.convertMappedRangeToList(ipv4_addresses)
  478. #Convert all elements to str
  479. pingPorts = map(str, pingPorts)
  480. #Add ping ports to result
  481. decompressedMap[self.PING_PORTS_KEY] = pingPorts
  482. #Add hosts list to result
  483. decompressedMap[self.HOSTS_LIST_KEY] = hostsList
  484. #Add racks list to result
  485. decompressedMap[self.RACKS_KEY] = racks
  486. #Add ips list to result
  487. decompressedMap[self.IPV4_ADDRESSES_KEY] = ipv4_addresses
  488. #Add ambari-server properties to result
  489. decompressedMap[self.AMBARI_SERVER_HOST] = ambariServerHost
  490. decompressedMap[self.AMBARI_SERVER_PORT] = ambariServerPort
  491. decompressedMap[self.AMBARI_SERVER_USE_SSL] = ambariServerUseSsl
  492. return decompressedMap
  493. # Converts from 1-3,5,6-8 to [1,2,3,5,6,7,8]
  494. def convertRangeToList(self, list):
  495. resultList = []
  496. for i in list:
  497. ranges = i.split(',')
  498. for r in ranges:
  499. rangeBounds = r.split('-')
  500. if len(rangeBounds) == 2:
  501. if not rangeBounds[0] or not rangeBounds[1]:
  502. raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
  503. resultList.extend(range(int(rangeBounds[0]), int(rangeBounds[1]) + 1))
  504. elif len(rangeBounds) == 1:
  505. resultList.append((int(rangeBounds[0])))
  506. else:
  507. raise AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
  508. return resultList
  509. #Converts from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
  510. def convertMappedRangeToList(self, list):
  511. resultDict = {}
  512. for i in list:
  513. valueToRanges = i.split(":")
  514. if len(valueToRanges) <> 2:
  515. raise AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(i))
  516. value = valueToRanges[0]
  517. rangesToken = valueToRanges[1]
  518. for r in rangesToken.split(','):
  519. rangeIndexes = r.split('-')
  520. if len(rangeIndexes) == 2:
  521. if not rangeIndexes[0] or not rangeIndexes[1]:
  522. raise AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(r))
  523. start = int(rangeIndexes[0])
  524. end = int(rangeIndexes[1])
  525. for k in range(start, end + 1):
  526. resultDict[k] = value if not value.isdigit() else int(value)
  527. elif len(rangeIndexes) == 1:
  528. index = int(rangeIndexes[0])
  529. resultDict[index] = value if not value.isdigit() else int(value)
  530. resultList = dict(sorted(resultDict.items())).values()
  531. return resultList