install_packages.py 19 KB


  1. #!/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. Ambari Agent
  17. """
  18. import os
  19. import signal
  20. import re
  21. import os.path
  22. import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
  23. from resource_management import *
  24. from resource_management.libraries.functions.list_ambari_managed_repos import list_ambari_managed_repos
  25. from ambari_commons.os_check import OSCheck, OSConst
  26. from resource_management.libraries.functions.packages_analyzer import allInstalledPackages
  27. from resource_management.libraries.functions import conf_select
  28. from resource_management.libraries.functions.hdp_select import get_hdp_versions
  29. from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version
  30. from resource_management.libraries.functions.repo_version_history \
  31. import read_actual_version_from_history_file, write_actual_version_to_history_file, REPO_VERSION_HISTORY_FILE
  32. from resource_management.core.logger import Logger
  33. class InstallPackages(Script):
  34. """
  35. This script is a part of Rolling Upgrade workflow and is described at
  36. appropriate design doc.
  37. It installs repositories to the node and then installs packages.
  38. For now, repositories are installed into individual files.
  39. """
  40. UBUNTU_REPO_COMPONENTS_POSTFIX = ["main"]
  41. REPO_FILE_NAME_PREFIX = 'HDP-'
  42. STACK_TO_ROOT_FOLDER = {"HDP": "/usr/hdp"}
  43. def actionexecute(self, env):
  44. num_errors = 0
  45. # Parse parameters
  46. config = Script.get_config()
  47. repo_rhel_suse = config['configurations']['cluster-env']['repo_suse_rhel_template']
  48. repo_ubuntu = config['configurations']['cluster-env']['repo_ubuntu_template']
  49. template = repo_rhel_suse if OSCheck.is_redhat_family() or OSCheck.is_suse_family() else repo_ubuntu
  50. # Handle a SIGTERM and SIGINT gracefully
  51. signal.signal(signal.SIGTERM, self.abort_handler)
  52. signal.signal(signal.SIGINT, self.abort_handler)
  53. # Select dict that contains parameters
  54. try:
  55. self.repository_version = config['roleParams']['repository_version']
  56. base_urls = json.loads(config['roleParams']['base_urls'])
  57. package_list = json.loads(config['roleParams']['package_list'])
  58. stack_id = config['roleParams']['stack_id']
  59. except KeyError:
  60. # Last try
  61. self.repository_version = config['commandParams']['repository_version']
  62. base_urls = json.loads(config['commandParams']['base_urls'])
  63. package_list = json.loads(config['commandParams']['package_list'])
  64. stack_id = config['commandParams']['stack_id']
  65. # current stack information
  66. self.current_hdp_stack_version = None
  67. if 'stack_version' in config['hostLevelParams']:
  68. current_stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
  69. self.current_hdp_stack_version = format_hdp_stack_version(current_stack_version_unformatted)
  70. stack_name = None
  71. self.stack_root_folder = None
  72. if stack_id and "-" in stack_id:
  73. stack_split = stack_id.split("-")
  74. if len(stack_split) == 2:
  75. stack_name = stack_split[0].upper()
  76. if stack_name in self.STACK_TO_ROOT_FOLDER:
  77. self.stack_root_folder = self.STACK_TO_ROOT_FOLDER[stack_name]
  78. if self.stack_root_folder is None:
  79. raise Fail("Cannot determine the stack's root directory by parsing the stack_id property, {0}".format(str(stack_id)))
  80. if self.repository_version is None:
  81. raise Fail("Cannot determine the repository version to install")
  82. self.repository_version = self.repository_version.strip()
  83. # Install/update repositories
  84. installed_repositories = []
  85. self.current_repositories = []
  86. self.current_repo_files = set()
  87. # Enable base system repositories
  88. # We don't need that for RHEL family, because we leave all repos enabled
  89. # except disabled HDP* ones
  90. if OSCheck.is_suse_family():
  91. self.current_repositories.append('base')
  92. elif OSCheck.is_ubuntu_family():
  93. self.current_repo_files.add('base')
  94. Logger.info("Will install packages for repository version {0}".format(self.repository_version))
  95. try:
  96. append_to_file = False
  97. for url_info in base_urls:
  98. repo_name, repo_file = self.install_repository(url_info, append_to_file, template)
  99. self.current_repositories.append(repo_name)
  100. self.current_repo_files.add(repo_file)
  101. append_to_file = True
  102. installed_repositories = list_ambari_managed_repos()
  103. except Exception, err:
  104. Logger.logger.exception("Cannot distribute repositories. Error: {0}".format(str(err)))
  105. num_errors += 1
  106. # Build structured output with initial values
  107. self.structured_output = {
  108. 'ambari_repositories': installed_repositories,
  109. 'installed_repository_version': self.repository_version,
  110. 'stack_id': stack_id,
  111. 'package_installation_result': 'FAIL'
  112. }
  113. self.put_structured_out(self.structured_output)
  114. if num_errors > 0:
  115. raise Fail("Failed to distribute repositories/install packages")
  116. # Initial list of versions, used to compute the new version installed
  117. self.old_versions = get_hdp_versions(self.stack_root_folder)
  118. try:
  119. is_package_install_successful = False
  120. ret_code = self.install_packages(package_list)
  121. if ret_code == 0:
  122. self.structured_output['package_installation_result'] = 'SUCCESS'
  123. self.put_structured_out(self.structured_output)
  124. is_package_install_successful = True
  125. else:
  126. num_errors += 1
  127. except Exception, err:
  128. num_errors += 1
  129. Logger.logger.exception("Could not install packages. Error: {0}".format(str(err)))
  130. # Provide correct exit code
  131. if num_errors > 0:
  132. raise Fail("Failed to distribute repositories/install packages")
  133. # if installing a version of HDP that needs some symlink love, then create them
  134. if is_package_install_successful and 'actual_version' in self.structured_output:
  135. self._create_config_links_if_necessary(stack_id, self.structured_output['actual_version'])
  136. def _create_config_links_if_necessary(self, stack_id, stack_version):
  137. """
  138. Sets up the required structure for /etc/<component>/conf symlinks and /usr/hdp/current
  139. configuration symlinks IFF the current stack is < HDP 2.3+ and the new stack is >= HDP 2.3
  140. stack_id: stack id, ie HDP-2.3
  141. stack_version: version to set, ie 2.3.0.0-1234
  142. """
  143. if stack_id is None:
  144. Logger.info("Cannot create config links when stack_id is not defined")
  145. return
  146. args = stack_id.upper().split('-')
  147. if len(args) != 2:
  148. Logger.info("Unrecognized stack id {0}, cannot create config links".format(stack_id))
  149. return
  150. if args[0] != "HDP":
  151. Logger.info("Unrecognized stack name {0}, cannot create config links".format(args[0]))
  152. if compare_versions(format_hdp_stack_version(args[1]), "2.3.0.0") < 0:
  153. Logger.info("Configuration symlinks are not needed for {0}, only HDP-2.3+".format(stack_version))
  154. return
  155. for package_name, directories in conf_select.PACKAGE_DIRS.iteritems():
  156. # if already on HDP 2.3, then we should skip making conf.backup folders
  157. if self.current_hdp_stack_version and compare_versions(self.current_hdp_stack_version, '2.3') >= 0:
  158. Logger.info("The current cluster stack of {0} does not require backing up configurations; "
  159. "only conf-select versioned config directories will be created.".format(stack_version))
  160. # only link configs for all known packages
  161. conf_select.link_component_conf_to_versioned_config(package_name, stack_version)
  162. else:
  163. # link configs and create conf.backup folders for all known packages
  164. conf_select.convert_conf_directories_to_symlinks(package_name, stack_version, directories,
  165. skip_existing_links = False, link_to = "backup")
  166. def compute_actual_version(self):
  167. """
  168. After packages are installed, determine what the new actual version is.
  169. """
  170. # If the repo contains a build number, optimistically assume it to be the actual_version. It will get changed
  171. # to correct value if it is not
  172. self.actual_version = None
  173. self.repo_version_with_build_number = None
  174. if self.repository_version:
  175. m = re.search("[\d\.]+-\d+", self.repository_version)
  176. if m:
  177. # Contains a build number
  178. self.repo_version_with_build_number = self.repository_version
  179. self.structured_output['actual_version'] = self.repo_version_with_build_number # This is the best value known so far.
  180. self.put_structured_out(self.structured_output)
  181. Logger.info("Attempting to determine actual version with build number.")
  182. Logger.info("Old versions: {0}".format(self.old_versions))
  183. new_versions = get_hdp_versions(self.stack_root_folder)
  184. Logger.info("New versions: {0}".format(new_versions))
  185. deltas = set(new_versions) - set(self.old_versions)
  186. Logger.info("Deltas: {0}".format(deltas))
  187. # Get version without build number
  188. normalized_repo_version = self.repository_version.split('-')[0]
  189. if 1 == len(deltas):
  190. self.actual_version = next(iter(deltas)).strip()
  191. self.structured_output['actual_version'] = self.actual_version
  192. self.put_structured_out(self.structured_output)
  193. write_actual_version_to_history_file(normalized_repo_version, self.actual_version)
  194. Logger.info(
  195. "Found actual version {0} by checking the delta between versions before and after installing packages".format(
  196. self.actual_version))
  197. else:
  198. # If the first install attempt does a partial install and is unable to report this to the server,
  199. # then a subsequent attempt will report an empty delta. For this reason, we search for a best fit version for the repo version
  200. Logger.info("Cannot determine actual version installed by checking the delta between versions "
  201. "before and after installing package")
  202. Logger.info("Will try to find for the actual version by searching for best possible match in the list of versions installed")
  203. self.actual_version = self.find_best_fit_version(new_versions, self.repository_version)
  204. if self.actual_version is not None:
  205. self.actual_version = self.actual_version.strip()
  206. self.structured_output['actual_version'] = self.actual_version
  207. self.put_structured_out(self.structured_output)
  208. Logger.info("Found actual version {0} by searching for best possible match".format(self.actual_version))
  209. else:
  210. msg = "Could not determine actual version installed. Try reinstalling packages again."
  211. raise Fail(msg)
  212. def check_partial_install(self):
  213. """
  214. If an installation did not complete successfully, check if installation was partially complete and
  215. log the partially completed version to REPO_VERSION_HISTORY_FILE.
  216. :return:
  217. """
  218. Logger.info("Installation of packages failed. Checking if installation was partially complete")
  219. Logger.info("Old versions: {0}".format(self.old_versions))
  220. new_versions = get_hdp_versions(self.stack_root_folder)
  221. Logger.info("New versions: {0}".format(new_versions))
  222. deltas = set(new_versions) - set(self.old_versions)
  223. Logger.info("Deltas: {0}".format(deltas))
  224. # Get version without build number
  225. normalized_repo_version = self.repository_version.split('-')[0]
  226. if 1 == len(deltas):
  227. # Some packages were installed successfully. Log this version to REPO_VERSION_HISTORY_FILE
  228. partial_install_version = next(iter(deltas)).strip()
  229. write_actual_version_to_history_file(normalized_repo_version, partial_install_version)
  230. Logger.info("Version {0} was partially installed. ".format(partial_install_version))
  231. def find_best_fit_version(self, versions, repo_version):
  232. """
  233. Given a list of installed versions and a repo version, search for a version that best fits the repo version
  234. If the repo version is found in the list of installed versions, return the repo version itself.
  235. If the repo version is not found in the list of installed versions
  236. normalize the repo version and use the REPO_VERSION_HISTORY_FILE file to search the list.
  237. :param versions: List of versions installed
  238. :param repo_version: Repo version to search
  239. :return: Matching version, None if no match was found.
  240. """
  241. if versions is None or repo_version is None:
  242. return None
  243. build_num_match = re.search("[\d\.]+-\d+", repo_version)
  244. if build_num_match and repo_version in versions:
  245. # If repo version has build number and is found in the list of versions, return it as the matching version
  246. Logger.info("Best Fit Version: Resolved from repo version with valid build number: {0}".format(repo_version))
  247. return repo_version
  248. # Get version without build number
  249. normalized_repo_version = repo_version.split('-')[0]
  250. # Find all versions that match the normalized repo version
  251. match_versions = filter(lambda x: x.startswith(normalized_repo_version), versions)
  252. if match_versions:
  253. if len(match_versions) == 1:
  254. # Resolved without conflicts
  255. Logger.info("Best Fit Version: Resolved from normalized repo version without conflicts: {0}".format(match_versions[0]))
  256. return match_versions[0]
  257. # Resolve conflicts using REPO_VERSION_HISTORY_FILE
  258. history_version = read_actual_version_from_history_file(normalized_repo_version)
  259. # Validate history version retrieved is valid
  260. if history_version in match_versions:
  261. Logger.info("Best Fit Version: Resolved from normalized repo version using {0}: {1}".format(REPO_VERSION_HISTORY_FILE, history_version))
  262. return history_version
  263. # No matching version
  264. return None
  265. def install_packages(self, package_list):
  266. """
  267. Actually install the packages using the package manager.
  268. :param package_list: List of package names to install
  269. :return: Returns 0 if no errors were found, and 1 otherwise.
  270. """
  271. ret_code = 0
  272. # Install packages
  273. packages_were_checked = False
  274. try:
  275. Package("hdp-select",
  276. action="upgrade",
  277. )
  278. packages_installed_before = []
  279. allInstalledPackages(packages_installed_before)
  280. packages_installed_before = [package[0] for package in packages_installed_before]
  281. packages_were_checked = True
  282. filtered_package_list = self.filter_package_list(package_list)
  283. for package in filtered_package_list:
  284. name = self.format_package_name(package['name'])
  285. Package(name,
  286. action="upgrade" # this enables upgrading non-versioned packages, despite the fact they exist. Needed by 'mahout' which is non-version but have to be updated
  287. )
  288. except Exception, err:
  289. ret_code = 1
  290. Logger.logger.exception("Package Manager failed to install packages. Error: {0}".format(str(err)))
  291. # Remove already installed packages in case of fail
  292. if packages_were_checked and packages_installed_before:
  293. packages_installed_after = []
  294. allInstalledPackages(packages_installed_after)
  295. packages_installed_after = [package[0] for package in packages_installed_after]
  296. packages_installed_before = set(packages_installed_before)
  297. new_packages_installed = [package for package in packages_installed_after if package not in packages_installed_before]
  298. if OSCheck.is_ubuntu_family():
  299. package_version_string = self.repository_version.replace('.', '-')
  300. else:
  301. package_version_string = self.repository_version.replace('-', '_')
  302. package_version_string = package_version_string.replace('.', '_')
  303. for package in new_packages_installed:
  304. if package_version_string and (package_version_string in package):
  305. Package(package, action="remove")
  306. # Compute the actual version in order to save it in structured out
  307. try:
  308. if ret_code == 0:
  309. self.compute_actual_version()
  310. else:
  311. self.check_partial_install()
  312. except Fail, err:
  313. ret_code = 1
  314. Logger.logger.exception("Failure while computing actual version. Error: {0}".format(str(err)))
  315. return ret_code
  316. def install_repository(self, url_info, append_to_file, template):
  317. repo = {
  318. 'repoName': "{0}-{1}".format(url_info['name'], self.repository_version)
  319. }
  320. if not 'baseUrl' in url_info:
  321. repo['baseurl'] = None
  322. else:
  323. repo['baseurl'] = url_info['baseUrl']
  324. if not 'mirrorsList' in url_info:
  325. repo['mirrorsList'] = None
  326. else:
  327. repo['mirrorsList'] = url_info['mirrorsList']
  328. ubuntu_components = [url_info['name']] + self.UBUNTU_REPO_COMPONENTS_POSTFIX
  329. file_name = self.REPO_FILE_NAME_PREFIX + self.repository_version
  330. Repository(repo['repoName'],
  331. action = "create",
  332. base_url = repo['baseurl'],
  333. mirror_list = repo['mirrorsList'],
  334. repo_file_name = file_name,
  335. repo_template = template,
  336. append_to_file = append_to_file,
  337. components = ubuntu_components, # ubuntu specific
  338. )
  339. return repo['repoName'], file_name
  340. def abort_handler(self, signum, frame):
  341. Logger.error("Caught signal {0}, will handle it gracefully. Compute the actual version if possible before exiting.".format(signum))
  342. self.check_partial_install()
  343. def filter_package_list(self, package_list):
  344. """
  345. Note: that we have skipUpgrade option in metainfo.xml to filter packages,
  346. so use this method only if, for some reason the metainfo option cannot be used.
  347. Here we filter packages that are managed with custom logic in package
  348. scripts. Usually this packages come from system repositories, and either
  349. are not available when we restrict repository list, or should not be
  350. installed on host at all.
  351. :param package_list: original list
  352. :return: filtered package_list
  353. """
  354. filtered_package_list = []
  355. for package in package_list:
  356. skip_package = False
  357. # skip upgrade for hadooplzo* versioned package, only if lzo is disabled
  358. io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None)
  359. if not io_compression_codecs or "com.hadoop.compression.lzo" not in io_compression_codecs:
  360. skip_package = package['name'].startswith('hadooplzo')
  361. if not skip_package:
  362. filtered_package_list.append(package)
  363. return filtered_package_list
  364. if __name__ == "__main__":
  365. InstallPackages().execute()