script.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import tempfile
  18. __all__ = ["Script"]
  19. import os
  20. import sys
  21. import json
  22. import logging
  23. import platform
  24. from ambari_commons.os_check import OSCheck
  25. from resource_management.libraries.resources import XmlConfig
  26. from resource_management.libraries.resources import PropertiesFile
  27. from resource_management.core.resources import File, Directory
  28. from resource_management.core.source import InlineTemplate
  29. from resource_management.core.environment import Environment
  30. from resource_management.core.logger import Logger
  31. from resource_management.core.exceptions import Fail, ClientComponentHasNoStatus, ComponentIsNotRunning
  32. from resource_management.core.resources.packaging import Package
  33. from resource_management.libraries.functions.version_select_util import get_component_version
  34. from resource_management.libraries.functions.version import compare_versions
  35. from resource_management.libraries.functions.version import format_hdp_stack_version
  36. from resource_management.libraries.script.config_dictionary import ConfigDictionary, UnknownConfiguration
  37. from resource_management.core.resources.system import Execute
  38. if OSCheck.is_windows_family():
  39. from resource_management.libraries.functions.install_hdp_msi import install_windows_msi
  40. from resource_management.libraries.functions.reload_windows_env import reload_windows_env
  41. from resource_management.libraries.functions.zip_archive import archive_dir
  42. from resource_management.libraries.resources import Msi
  43. else:
  44. from resource_management.libraries.functions.tar_archive import archive_dir
  45. USAGE = """Usage: {0} <COMMAND> <JSON_CONFIG> <BASEDIR> <STROUTPUT> <LOGGING_LEVEL> <TMP_DIR>
  46. <COMMAND> command type (INSTALL/CONFIGURE/START/STOP/SERVICE_CHECK...)
  47. <JSON_CONFIG> path to command json file. Ex: /var/lib/ambari-agent/data/command-2.json
  48. <BASEDIR> path to service metadata dir. Ex: /var/lib/ambari-agent/cache/common-services/HDFS/2.1.0.2.0/package
  49. <STROUTPUT> path to file with structured command output (file will be created). Ex:/tmp/my.txt
  50. <LOGGING_LEVEL> log level for stdout. Ex:DEBUG,INFO
  51. <TMP_DIR> temporary directory for executable scripts. Ex: /var/lib/ambari-agent/data/tmp
  52. """
  53. _PASSWORD_MAP = {"/configurations/cluster-env/hadoop.user.name":"/configurations/cluster-env/hadoop.user.password"}
  54. def get_path_from_configuration(name, configuration):
  55. subdicts = filter(None, name.split('/'))
  56. for x in subdicts:
  57. if x in configuration:
  58. configuration = configuration[x]
  59. else:
  60. return None
  61. return configuration
  62. class Script(object):
  63. """
  64. Executes a command for custom service. stdout and stderr are written to
  65. tmpoutfile and to tmperrfile respectively.
  66. Script instances share configuration as a class parameter and therefore
  67. different Script instances can not be used from different threads at
  68. the same time within a single python process
  69. Accepted command line arguments mapping:
  70. 1 command type (START/STOP/...)
  71. 2 path to command json file
  72. 3 path to service metadata dir (Directory "package" inside service directory)
  73. 4 path to file with structured command output (file will be created)
  74. """
  75. structuredOut = {}
  76. command_data_file = ""
  77. basedir = ""
  78. stroutfile = ""
  79. logging_level = ""
  80. # Class variable
  81. tmp_dir = ""
  82. def get_stack_to_component(self):
  83. """
  84. To be overridden by subclasses.
  85. Returns a dictionary where the key is a stack name, and the value is the component name used in selecting the version.
  86. """
  87. return {}
  88. def load_structured_out(self):
  89. Script.structuredOut = {}
  90. if os.path.exists(self.stroutfile):
  91. with open(self.stroutfile, 'r') as fp:
  92. Script.structuredOut = json.load(fp)
  93. # version is only set in a specific way and should not be carried
  94. if "version" in Script.structuredOut:
  95. del Script.structuredOut["version"]
  96. # reset security issues and errors found on previous runs
  97. if "securityIssuesFound" in Script.structuredOut:
  98. del Script.structuredOut["securityIssuesFound"]
  99. if "securityStateErrorInfo" in Script.structuredOut:
  100. del Script.structuredOut["securityStateErrorInfo"]
  101. def put_structured_out(self, sout):
  102. curr_content = Script.structuredOut.copy()
  103. Script.structuredOut.update(sout)
  104. try:
  105. with open(self.stroutfile, 'w') as fp:
  106. json.dump(Script.structuredOut, fp)
  107. except IOError, err:
  108. Script.structuredOut.update({"errMsg" : "Unable to write to " + self.stroutfile})
  109. def save_component_version_to_structured_out(self):
  110. """
  111. :param stack_name: One of HDP, HDPWIN, PHD, BIGTOP.
  112. :return: Append the version number to the structured out.
  113. """
  114. from resource_management.libraries.functions.default import default
  115. stack_name = default("/hostLevelParams/stack_name", None)
  116. stack_to_component = self.get_stack_to_component()
  117. if stack_to_component and stack_name:
  118. component_name = stack_to_component[stack_name] if stack_name in stack_to_component else None
  119. component_version = get_component_version(stack_name, component_name)
  120. if component_version:
  121. self.put_structured_out({"version": component_version})
  122. def should_expose_component_version(self, command_name):
  123. """
  124. Analyzes config and given command to determine if stack version should be written
  125. to structured out. Currently only HDP stack versions >= 2.2 are supported.
  126. :param command_name: command name
  127. :return: True or False
  128. """
  129. from resource_management.libraries.functions.default import default
  130. stack_version_unformatted = str(default("/hostLevelParams/stack_version", ""))
  131. hdp_stack_version = format_hdp_stack_version(stack_version_unformatted)
  132. if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >= 0:
  133. if command_name.lower() == "status":
  134. request_version = default("/commandParams/request_version", None)
  135. if request_version is not None:
  136. return True
  137. else:
  138. # Populate version only on base commands
  139. return command_name.lower() == "start" or command_name.lower() == "install" or command_name.lower() == "restart"
  140. return False
  141. def execute(self):
  142. """
  143. Sets up logging;
  144. Parses command parameters and executes method relevant to command type
  145. """
  146. logger, chout, cherr = Logger.initialize_logger()
  147. # parse arguments
  148. if len(sys.argv) < 7:
  149. logger.error("Script expects at least 6 arguments")
  150. print USAGE.format(os.path.basename(sys.argv[0])) # print to stdout
  151. sys.exit(1)
  152. command_name = str.lower(sys.argv[1])
  153. self.command_data_file = sys.argv[2]
  154. self.basedir = sys.argv[3]
  155. self.stroutfile = sys.argv[4]
  156. self.load_structured_out()
  157. self.logging_level = sys.argv[5]
  158. Script.tmp_dir = sys.argv[6]
  159. logging_level_str = logging._levelNames[self.logging_level]
  160. chout.setLevel(logging_level_str)
  161. logger.setLevel(logging_level_str)
  162. # on windows we need to reload some of env variables manually because there is no default paths for configs(like
  163. # /etc/something/conf on linux. When this env vars created by one of the Script execution, they can not be updated
  164. # in agent, so other Script executions will not be able to access to new env variables
  165. if OSCheck.is_windows_family():
  166. reload_windows_env()
  167. try:
  168. with open(self.command_data_file) as f:
  169. pass
  170. Script.config = ConfigDictionary(json.load(f))
  171. # load passwords here(used on windows to impersonate different users)
  172. Script.passwords = {}
  173. for k, v in _PASSWORD_MAP.iteritems():
  174. if get_path_from_configuration(k, Script.config) and get_path_from_configuration(v, Script.config):
  175. Script.passwords[get_path_from_configuration(k, Script.config)] = get_path_from_configuration(v, Script.config)
  176. except IOError:
  177. logger.exception("Can not read json file with command parameters: ")
  178. sys.exit(1)
  179. # Run class method depending on a command type
  180. try:
  181. method = self.choose_method_to_execute(command_name)
  182. with Environment(self.basedir, tmp_dir=Script.tmp_dir) as env:
  183. env.config.download_path = Script.tmp_dir
  184. method(env)
  185. if command_name == "install":
  186. self.set_version()
  187. except ClientComponentHasNoStatus or ComponentIsNotRunning:
  188. # Support of component status checks.
  189. # Non-zero exit code is interpreted as an INSTALLED status of a component
  190. sys.exit(1)
  191. except Fail:
  192. logger.exception("Error while executing command '{0}':".format(command_name))
  193. sys.exit(1)
  194. finally:
  195. if self.should_expose_component_version(command_name):
  196. self.save_component_version_to_structured_out()
  197. def choose_method_to_execute(self, command_name):
  198. """
  199. Returns a callable object that should be executed for a given command.
  200. """
  201. self_methods = dir(self)
  202. if not command_name in self_methods:
  203. raise Fail("Script '{0}' has no method '{1}'".format(sys.argv[0], command_name))
  204. method = getattr(self, command_name)
  205. return method
  206. @staticmethod
  207. def get_config():
  208. """
  209. HACK. Uses static field to store configuration. This is a workaround for
  210. "circular dependency" issue when importing params.py file and passing to
  211. it a configuration instance.
  212. """
  213. return Script.config
  214. @staticmethod
  215. def get_password(user):
  216. return Script.passwords[user]
  217. @staticmethod
  218. def get_tmp_dir():
  219. """
  220. HACK. Uses static field to avoid "circular dependency" issue when
  221. importing params.py.
  222. """
  223. return Script.tmp_dir
  224. @staticmethod
  225. def get_component_from_role(role_directory_map, default_role):
  226. """
  227. Gets the /usr/hdp/current/<component> component given an Ambari role,
  228. such as DATANODE or HBASE_MASTER.
  229. :return: the component name, such as hbase-master
  230. """
  231. from resource_management.libraries.functions.default import default
  232. command_role = default("/role", default_role)
  233. if command_role in role_directory_map:
  234. return role_directory_map[command_role]
  235. else:
  236. return role_directory_map[default_role]
  237. @staticmethod
  238. def get_stack_name():
  239. """
  240. Gets the name of the stack from hostLevelParams/stack_name.
  241. :return: a stack name or None
  242. """
  243. from resource_management.libraries.functions.default import default
  244. return default("/hostLevelParams/stack_name", None)
  245. @staticmethod
  246. def get_hdp_stack_version():
  247. """
  248. Gets the normalized version of the HDP stack in the form #.#.#.# if it is
  249. present on the configurations sent.
  250. :return: a normalized HDP stack version or None
  251. """
  252. stack_name = Script.get_stack_name()
  253. if stack_name is None or stack_name.upper() != "HDP":
  254. return None
  255. config = Script.get_config()
  256. if 'hostLevelParams' not in config or 'stack_version' not in config['hostLevelParams']:
  257. return None
  258. stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
  259. if stack_version_unformatted is None or stack_version_unformatted == '':
  260. return None
  261. return format_hdp_stack_version(stack_version_unformatted)
  262. @staticmethod
  263. def is_hdp_stack_greater_or_equal(compare_to_version):
  264. """
  265. Gets whether the hostLevelParams/stack_version, after being normalized,
  266. is greater than or equal to the specified stack version
  267. :param compare_to_version: the version to compare to
  268. :return: True if the command's stack is greater than the specified version
  269. """
  270. hdp_stack_version = Script.get_hdp_stack_version()
  271. if hdp_stack_version is None or hdp_stack_version == "":
  272. return False
  273. return compare_versions(hdp_stack_version, compare_to_version) >= 0
  274. @staticmethod
  275. def is_hdp_stack_less_than(compare_to_version):
  276. """
  277. Gets whether the hostLevelParams/stack_version, after being normalized,
  278. is less than the specified stack version
  279. :param compare_to_version: the version to compare to
  280. :return: True if the command's stack is less than the specified version
  281. """
  282. hdp_stack_version = Script.get_hdp_stack_version()
  283. if hdp_stack_version is None:
  284. return False
  285. return compare_versions(hdp_stack_version, compare_to_version) < 0
  286. def install(self, env):
  287. """
  288. Default implementation of install command is to install all packages
  289. from a list, received from the server.
  290. Feel free to override install() method with your implementation. It
  291. usually makes sense to call install_packages() manually in this case
  292. """
  293. self.install_packages(env)
  294. def install_packages(self, env, exclude_packages=[]):
  295. """
  296. List of packages that are required< by service is received from the server
  297. as a command parameter. The method installs all packages
  298. from this list
  299. """
  300. config = self.get_config()
  301. if 'host_sys_prepped' in config['hostLevelParams']:
  302. # do not install anything on sys-prepped host
  303. if config['hostLevelParams']['host_sys_prepped'] == True:
  304. Logger.info("Node has all packages pre-installed. Skipping.")
  305. return
  306. pass
  307. try:
  308. package_list_str = config['hostLevelParams']['package_list']
  309. if isinstance(package_list_str, basestring) and len(package_list_str) > 0:
  310. package_list = json.loads(package_list_str)
  311. for package in package_list:
  312. if not package['name'] in exclude_packages:
  313. name = package['name']
  314. if OSCheck.is_windows_family():
  315. if name[-4:] == ".msi":
  316. #TODO all msis must be located in resource folder of server, change it to repo later
  317. Msi(name, http_source=os.path.join(config['hostLevelParams']['jdk_location']))
  318. else:
  319. Package(name)
  320. except KeyError:
  321. pass # No reason to worry
  322. if OSCheck.is_windows_family():
  323. #TODO hacky install of windows msi, remove it or move to old(2.1) stack definition when component based install will be implemented
  324. hadoop_user = config["configurations"]["cluster-env"]["hadoop.user.name"]
  325. install_windows_msi(os.path.join(config['hostLevelParams']['jdk_location'], "hdp.msi"),
  326. config["hostLevelParams"]["agentCacheDir"], "hdp.msi", hadoop_user, self.get_password(hadoop_user),
  327. str(config['hostLevelParams']['stack_version']))
  328. reload_windows_env()
  329. pass
  330. @staticmethod
  331. def fail_with_error(message):
  332. """
  333. Prints error message and exits with non-zero exit code
  334. """
  335. print("Error: " + message)
  336. sys.stderr.write("Error: " + message)
  337. sys.exit(1)
  338. def start(self, env, rolling_restart=False):
  339. """
  340. To be overridden by subclasses
  341. """
  342. self.fail_with_error('start method isn\'t implemented')
  343. def stop(self, env, rolling_restart=False):
  344. """
  345. To be overridden by subclasses
  346. """
  347. self.fail_with_error('stop method isn\'t implemented')
  348. def pre_rolling_restart(self, env):
  349. """
  350. To be overridden by subclasses
  351. """
  352. pass
  353. def restart(self, env):
  354. """
  355. Default implementation of restart command is to call stop and start methods
  356. Feel free to override restart() method with your implementation.
  357. For client components we call install
  358. """
  359. config = self.get_config()
  360. componentCategory = None
  361. try:
  362. componentCategory = config['roleParams']['component_category']
  363. except KeyError:
  364. pass
  365. restart_type = ""
  366. if config is not None:
  367. command_params = config["commandParams"] if "commandParams" in config else None
  368. if command_params is not None:
  369. restart_type = command_params["restart_type"] if "restart_type" in command_params else ""
  370. if restart_type:
  371. restart_type = restart_type.encode('ascii', 'ignore')
  372. rolling_restart = restart_type.lower().startswith("rolling")
  373. if componentCategory and componentCategory.strip().lower() == 'CLIENT'.lower():
  374. if rolling_restart:
  375. self.pre_rolling_restart(env)
  376. self.install(env)
  377. else:
  378. # To remain backward compatible with older stacks, only pass rolling_restart if True.
  379. if rolling_restart:
  380. self.stop(env, rolling_restart=rolling_restart)
  381. else:
  382. self.stop(env)
  383. if rolling_restart:
  384. self.pre_rolling_restart(env)
  385. # To remain backward compatible with older stacks, only pass rolling_restart if True.
  386. if rolling_restart:
  387. self.start(env, rolling_restart=rolling_restart)
  388. else:
  389. self.start(env)
  390. if rolling_restart:
  391. self.post_rolling_restart(env)
  392. if self.should_expose_component_version("restart"):
  393. self.save_component_version_to_structured_out()
  394. def post_rolling_restart(self, env):
  395. """
  396. To be overridden by subclasses
  397. """
  398. pass
  399. def configure(self, env, rolling_restart=False):
  400. """
  401. To be overridden by subclasses
  402. """
  403. self.fail_with_error('configure method isn\'t implemented')
  404. def security_status(self, env):
  405. """
  406. To be overridden by subclasses to provide the current security state of the component.
  407. Implementations are required to set the "securityState" property of the structured out data set
  408. to one of the following values:
  409. UNSECURED - If the component is not configured for any security protocol such as
  410. Kerberos
  411. SECURED_KERBEROS - If the component is configured for Kerberos
  412. UNKNOWN - If the security state cannot be determined
  413. ERROR - If the component is supposed to be secured, but there are issues with the
  414. configuration. For example, if the component is configured for Kerberos
  415. but the configured principal and keytab file fail to kinit
  416. """
  417. self.put_structured_out({"securityState": "UNKNOWN"})
  418. def generate_configs_get_template_file_content(self, filename, dicts):
  419. config = self.get_config()
  420. content = ''
  421. for dict in dicts.split(','):
  422. if dict.strip() in config['configurations']:
  423. try:
  424. content += config['configurations'][dict.strip()]['content']
  425. except Fail:
  426. # 'content' section not available in the component client configuration
  427. pass
  428. return content
  429. def generate_configs_get_xml_file_content(self, filename, dict):
  430. config = self.get_config()
  431. return {'configurations':config['configurations'][dict],
  432. 'configuration_attributes':config['configuration_attributes'][dict]}
  433. def generate_configs_get_xml_file_dict(self, filename, dict):
  434. config = self.get_config()
  435. return config['configurations'][dict]
  436. def generate_configs(self, env):
  437. """
  438. Generates config files and stores them as an archive in tmp_dir
  439. based on xml_configs_list and env_configs_list from commandParams
  440. """
  441. import params
  442. env.set_params(params)
  443. config = self.get_config()
  444. xml_configs_list = config['commandParams']['xml_configs_list']
  445. env_configs_list = config['commandParams']['env_configs_list']
  446. properties_configs_list = config['commandParams']['properties_configs_list']
  447. Directory(self.get_tmp_dir(), recursive=True)
  448. conf_tmp_dir = tempfile.mkdtemp(dir=self.get_tmp_dir())
  449. output_filename = os.path.join(self.get_tmp_dir(), config['commandParams']['output_file'])
  450. try:
  451. for file_dict in xml_configs_list:
  452. for filename, dict in file_dict.iteritems():
  453. XmlConfig(filename,
  454. conf_dir=conf_tmp_dir,
  455. **self.generate_configs_get_xml_file_content(filename, dict)
  456. )
  457. for file_dict in env_configs_list:
  458. for filename,dicts in file_dict.iteritems():
  459. File(os.path.join(conf_tmp_dir, filename),
  460. content=InlineTemplate(self.generate_configs_get_template_file_content(filename, dicts)))
  461. for file_dict in properties_configs_list:
  462. for filename, dict in file_dict.iteritems():
  463. PropertiesFile(os.path.join(conf_tmp_dir, filename),
  464. properties=self.generate_configs_get_xml_file_dict(filename, dict)
  465. )
  466. archive_dir(output_filename, conf_tmp_dir)
  467. finally:
  468. Directory(conf_tmp_dir, action="delete")
  469. def set_version(self):
  470. from resource_management.libraries.functions.default import default
  471. stack_name = default("/hostLevelParams/stack_name", None)
  472. version = default("/commandParams/version", None)
  473. stack_version_unformatted = str(default("/hostLevelParams/stack_version", ""))
  474. hdp_stack_version = format_hdp_stack_version(stack_version_unformatted)
  475. stack_to_component = self.get_stack_to_component()
  476. if stack_to_component:
  477. component_name = stack_to_component[stack_name] if stack_name in stack_to_component else None
  478. if component_name and stack_name and version and \
  479. compare_versions(format_hdp_stack_version(hdp_stack_version), '2.2.0.0') >= 0:
  480. Execute(('/usr/bin/hdp-select', 'set', component_name, version),
  481. sudo = True)