params.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #!/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. """
  17. import os
  18. from resource_management.libraries.functions import format
  19. from resource_management.libraries.script.script import Script
  20. from resource_management.libraries.functions.version import format_stack_version
  21. from resource_management.libraries.functions import StackFeature
  22. from resource_management.libraries.functions.stack_features import check_stack_feature
  23. from resource_management.libraries.functions.stack_features import get_stack_feature_version
  24. from resource_management.libraries.functions.default import default
  25. from utils import get_bare_principal
  26. from resource_management.libraries.functions.get_stack_version import get_stack_version
  27. from resource_management.libraries.functions.is_empty import is_empty
  28. import status_params
  29. from resource_management.libraries.resources.hdfs_resource import HdfsResource
  30. from resource_management.libraries.functions import stack_select
  31. from resource_management.libraries.functions import conf_select
  32. from resource_management.libraries.functions import get_kinit_path
  33. from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
  34. from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs
  35. # server configurations
  36. config = Script.get_config()
  37. tmp_dir = Script.get_tmp_dir()
  38. stack_root = Script.get_stack_root()
  39. stack_name = default("/hostLevelParams/stack_name", None)
  40. retryAble = default("/commandParams/command_retry_enabled", False)
  41. # Version being upgraded/downgraded to
  42. version = default("/commandParams/version", None)
  43. # Version that is CURRENT.
  44. current_version = default("/hostLevelParams/current_version", None)
  45. stack_version_unformatted = config['hostLevelParams']['stack_version']
  46. stack_version_formatted = format_stack_version(stack_version_unformatted)
  47. upgrade_direction = default("/commandParams/upgrade_direction", None)
  48. # get the correct version to use for checking stack features
  49. version_for_stack_feature_checks = get_stack_feature_version(config)
  50. stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
  51. stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks)
  52. stack_supports_core_site_for_ranger_plugin = check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, version_for_stack_feature_checks)
  53. # When downgrading the 'version' and 'current_version' are both pointing to the downgrade-target version
  54. # downgrade_from_version provides the source-version the downgrade is happening from
  55. downgrade_from_version = default("/commandParams/downgrade_from_version", None)
  56. hostname = config['hostname']
  57. # default kafka parameters
  58. kafka_home = '/usr/lib/kafka'
  59. kafka_bin = kafka_home+'/bin/kafka'
  60. conf_dir = "/etc/kafka/conf"
  61. limits_conf_dir = "/etc/security/limits.d"
  62. # Used while upgrading the stack in a kerberized cluster and running kafka-acls.sh
  63. zookeeper_connect = default("/configurations/kafka-broker/zookeeper.connect", None)
  64. kafka_user_nofile_limit = config['configurations']['kafka-env']['kafka_user_nofile_limit']
  65. kafka_user_nproc_limit = config['configurations']['kafka-env']['kafka_user_nproc_limit']
  66. # parameters for 2.2+
  67. if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
  68. kafka_home = os.path.join(stack_root, "current", "kafka-broker")
  69. kafka_bin = os.path.join(kafka_home, "bin", "kafka")
  70. conf_dir = os.path.join(kafka_home, "config")
  71. kafka_user = config['configurations']['kafka-env']['kafka_user']
  72. kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
  73. kafka_pid_dir = status_params.kafka_pid_dir
  74. kafka_pid_file = kafka_pid_dir+"/kafka.pid"
  75. # This is hardcoded on the kafka bash process lifecycle on which we have no control over
  76. kafka_managed_pid_dir = "/var/run/kafka"
  77. kafka_managed_log_dir = "/var/log/kafka"
  78. user_group = config['configurations']['cluster-env']['user_group']
  79. java64_home = config['hostLevelParams']['java_home']
  80. kafka_env_sh_template = config['configurations']['kafka-env']['content']
  81. kafka_jaas_conf_template = default("/configurations/kafka_jaas_conf/content", None)
  82. kafka_client_jaas_conf_template = default("/configurations/kafka_client_jaas_conf/content", None)
  83. kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts']
  84. kafka_hosts.sort()
  85. zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts']
  86. zookeeper_hosts.sort()
  87. secure_acls = default("/configurations/kafka-broker/zookeeper.set.acl", False)
  88. kafka_security_migrator = os.path.join(kafka_home, "bin", "zookeeper-security-migration.sh")
  89. #Kafka log4j
  90. kafka_log_maxfilesize = default('/configurations/kafka-log4j/kafka_log_maxfilesize',256)
  91. kafka_log_maxbackupindex = default('/configurations/kafka-log4j/kafka_log_maxbackupindex',20)
  92. controller_log_maxfilesize = default('/configurations/kafka-log4j/controller_log_maxfilesize',256)
  93. controller_log_maxbackupindex = default('/configurations/kafka-log4j/controller_log_maxbackupindex',20)
  94. if (('kafka-log4j' in config['configurations']) and ('content' in config['configurations']['kafka-log4j'])):
  95. log4j_props = config['configurations']['kafka-log4j']['content']
  96. else:
  97. log4j_props = None
  98. if 'ganglia_server_host' in config['clusterHostInfo'] and \
  99. len(config['clusterHostInfo']['ganglia_server_host'])>0:
  100. ganglia_installed = True
  101. ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
  102. ganglia_report_interval = 60
  103. else:
  104. ganglia_installed = False
  105. metric_collector_port = ""
  106. metric_collector_protocol = ""
  107. metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
  108. metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
  109. metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
  110. ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", []))
  111. has_metric_collector = not len(ams_collector_hosts) == 0
  112. if has_metric_collector:
  113. if 'cluster-env' in config['configurations'] and \
  114. 'metrics_collector_vip_port' in config['configurations']['cluster-env']:
  115. metric_collector_port = config['configurations']['cluster-env']['metrics_collector_vip_port']
  116. else:
  117. metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188")
  118. if metric_collector_web_address.find(':') != -1:
  119. metric_collector_port = metric_collector_web_address.split(':')[1]
  120. else:
  121. metric_collector_port = '6188'
  122. if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY":
  123. metric_collector_protocol = 'https'
  124. else:
  125. metric_collector_protocol = 'http'
  126. pass
  127. # Security-related params
  128. security_enabled = config['configurations']['cluster-env']['security_enabled']
  129. kafka_kerberos_enabled = (('security.inter.broker.protocol' in config['configurations']['kafka-broker']) and
  130. ((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL") or
  131. (config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "SASL_PLAINTEXT")))
  132. if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \
  133. and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted):
  134. _hostname_lowercase = config['hostname'].lower()
  135. _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name']
  136. kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase)
  137. kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
  138. kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
  139. kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir +"/kafka_jaas.conf"
  140. else:
  141. kafka_kerberos_params = ''
  142. kafka_jaas_principal = None
  143. kafka_keytab_path = None
  144. # for curl command in ranger plugin to get db connector
  145. jdk_location = config['hostLevelParams']['jdk_location']
  146. # ranger kafka plugin section start
  147. # ranger host
  148. ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
  149. has_ranger_admin = not len(ranger_admin_hosts) == 0
  150. # ranger support xml_configuration flag, instead of depending on ranger xml_configurations_supported/ranger-env, using stack feature
  151. xml_configurations_supported = check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, version_for_stack_feature_checks)
  152. # ambari-server hostname
  153. ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
  154. ranger_admin_log_dir = default("/configurations/ranger-env/ranger_admin_log_dir","/var/log/ranger/admin")
  155. # ranger kafka plugin enabled property
  156. enable_ranger_kafka = default("configurations/ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled", "No")
  157. enable_ranger_kafka = True if enable_ranger_kafka.lower() == 'yes' else False
  158. # ranger kafka-plugin supported flag, instead of dependending on is_supported_kafka_ranger/kafka-env.xml, using stack feature
  159. is_supported_kafka_ranger = check_stack_feature(StackFeature.KAFKA_RANGER_PLUGIN_SUPPORT, version_for_stack_feature_checks)
  160. # ranger kafka properties
  161. if enable_ranger_kafka and is_supported_kafka_ranger:
  162. # get ranger policy url
  163. policymgr_mgr_url = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.policy.rest.url']
  164. if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'):
  165. policymgr_mgr_url = policymgr_mgr_url.rstrip('/')
  166. # ranger audit db user
  167. xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger')
  168. xa_audit_db_password = ''
  169. if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db and has_ranger_admin:
  170. xa_audit_db_password = config['configurations']['admin-properties']['audit_db_password']
  171. # ranger kafka service/repository name
  172. repo_name = str(config['clusterName']) + '_kafka'
  173. repo_name_value = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.service.name']
  174. if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}":
  175. repo_name = repo_name_value
  176. ranger_env = config['configurations']['ranger-env']
  177. # create ranger-env config having external ranger credential properties
  178. if not has_ranger_admin and enable_ranger_kafka:
  179. external_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_admin_username', 'admin')
  180. external_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_admin_password', 'admin')
  181. external_ranger_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_username', 'amb_ranger_admin')
  182. external_ranger_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_password', 'amb_ranger_admin')
  183. ranger_env = {}
  184. ranger_env['admin_username'] = external_admin_username
  185. ranger_env['admin_password'] = external_admin_password
  186. ranger_env['ranger_admin_username'] = external_ranger_admin_username
  187. ranger_env['ranger_admin_password'] = external_ranger_admin_password
  188. ranger_plugin_properties = config['configurations']['ranger-kafka-plugin-properties']
  189. ranger_kafka_audit = config['configurations']['ranger-kafka-audit']
  190. ranger_kafka_audit_attrs = config['configuration_attributes']['ranger-kafka-audit']
  191. ranger_kafka_security = config['configurations']['ranger-kafka-security']
  192. ranger_kafka_security_attrs = config['configuration_attributes']['ranger-kafka-security']
  193. ranger_kafka_policymgr_ssl = config['configurations']['ranger-kafka-policymgr-ssl']
  194. ranger_kafka_policymgr_ssl_attrs = config['configuration_attributes']['ranger-kafka-policymgr-ssl']
  195. policy_user = config['configurations']['ranger-kafka-plugin-properties']['policy_user']
  196. ranger_plugin_config = {
  197. 'username' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_USERNAME'],
  198. 'password' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_PASSWORD'],
  199. 'zookeeper.connect' : config['configurations']['ranger-kafka-plugin-properties']['zookeeper.connect'],
  200. 'commonNameForCertificate' : config['configurations']['ranger-kafka-plugin-properties']['common.name.for.certificate']
  201. }
  202. kafka_ranger_plugin_repo = {
  203. 'isEnabled': 'true',
  204. 'configs': ranger_plugin_config,
  205. 'description': 'kafka repo',
  206. 'name': repo_name,
  207. 'repositoryType': 'kafka',
  208. 'type': 'kafka',
  209. 'assetType': '1'
  210. }
  211. if stack_supports_ranger_kerberos and security_enabled:
  212. ranger_plugin_config['policy.download.auth.users'] = kafka_user
  213. ranger_plugin_config['tag.download.auth.users'] = kafka_user
  214. ranger_plugin_config['ambari.service.check.user'] = policy_user
  215. downloaded_custom_connector = None
  216. previous_jdbc_jar_name = None
  217. driver_curl_source = None
  218. driver_curl_target = None
  219. previous_jdbc_jar = None
  220. if has_ranger_admin and stack_supports_ranger_audit_db:
  221. xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR']
  222. jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = get_audit_configs(config)
  223. downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
  224. driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
  225. driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
  226. previous_jdbc_jar = format("{kafka_home}/libs/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None
  227. xa_audit_db_is_enabled = False
  228. if xml_configurations_supported and stack_supports_ranger_audit_db:
  229. xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db']
  230. xa_audit_hdfs_is_enabled = default('/configurations/ranger-kafka-audit/xasecure.audit.destination.hdfs', False)
  231. ssl_keystore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'] if xml_configurations_supported else None
  232. ssl_truststore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'] if xml_configurations_supported else None
  233. credential_file = format('/etc/ranger/{repo_name}/cred.jceks')
  234. stack_version = get_stack_version('kafka-broker')
  235. setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
  236. setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh")
  237. # for SQLA explicitly disable audit to DB for Ranger
  238. if has_ranger_admin and stack_supports_ranger_audit_db and xa_audit_db_flavor.lower() == 'sqla':
  239. xa_audit_db_is_enabled = False
  240. # ranger kafka plugin section end
  241. namenode_hosts = default("/clusterHostInfo/namenode_host", [])
  242. has_namenode = not len(namenode_hosts) == 0
  243. hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None
  244. hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None
  245. hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None
  246. hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None
  247. default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None
  248. hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None
  249. hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None
  250. kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
  251. import functools
  252. #create partial functions with common arguments for every HdfsResource call
  253. #to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code
  254. HdfsResource = functools.partial(
  255. HdfsResource,
  256. user=hdfs_user,
  257. hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
  258. security_enabled = security_enabled,
  259. keytab = hdfs_user_keytab,
  260. kinit_path_local = kinit_path_local,
  261. hadoop_bin_dir = hadoop_bin_dir,
  262. hadoop_conf_dir = hadoop_conf_dir,
  263. principal_name = hdfs_principal_name,
  264. hdfs_site = hdfs_site,
  265. default_fs = default_fs,
  266. immutable_paths = get_not_managed_resources()
  267. )