123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- #!/usr/bin/env python
- """
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- import os
- from resource_management.libraries.functions import format
- from resource_management.libraries.script.script import Script
- from resource_management.libraries.functions.version import format_stack_version
- from resource_management.libraries.functions import StackFeature
- from resource_management.libraries.functions.stack_features import check_stack_feature
- from resource_management.libraries.functions.stack_features import get_stack_feature_version
- from resource_management.libraries.functions.default import default
- from utils import get_bare_principal
- from resource_management.libraries.functions.get_stack_version import get_stack_version
- from resource_management.libraries.functions.is_empty import is_empty
- import status_params
- from resource_management.libraries.resources.hdfs_resource import HdfsResource
- from resource_management.libraries.functions import stack_select
- from resource_management.libraries.functions import conf_select
- from resource_management.libraries.functions import get_kinit_path
- from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
- from resource_management.libraries.functions.setup_ranger_plugin_xml import get_audit_configs
- # server configurations
- config = Script.get_config()
- tmp_dir = Script.get_tmp_dir()
- stack_root = Script.get_stack_root()
- stack_name = default("/hostLevelParams/stack_name", None)
- retryAble = default("/commandParams/command_retry_enabled", False)
- # Version being upgraded/downgraded to
- version = default("/commandParams/version", None)
- # Version that is CURRENT.
- current_version = default("/hostLevelParams/current_version", None)
- stack_version_unformatted = config['hostLevelParams']['stack_version']
- stack_version_formatted = format_stack_version(stack_version_unformatted)
- upgrade_direction = default("/commandParams/upgrade_direction", None)
- # get the correct version to use for checking stack features
- version_for_stack_feature_checks = get_stack_feature_version(config)
- stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks)
- stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks)
- stack_supports_core_site_for_ranger_plugin = check_stack_feature(StackFeature.CORE_SITE_FOR_RANGER_PLUGINS_SUPPORT, version_for_stack_feature_checks)
- # When downgrading the 'version' and 'current_version' are both pointing to the downgrade-target version
- # downgrade_from_version provides the source-version the downgrade is happening from
- downgrade_from_version = default("/commandParams/downgrade_from_version", None)
- hostname = config['hostname']
- # default kafka parameters
- kafka_home = '/usr/lib/kafka'
- kafka_bin = kafka_home+'/bin/kafka'
- conf_dir = "/etc/kafka/conf"
- limits_conf_dir = "/etc/security/limits.d"
- # Used while upgrading the stack in a kerberized cluster and running kafka-acls.sh
- zookeeper_connect = default("/configurations/kafka-broker/zookeeper.connect", None)
- kafka_user_nofile_limit = config['configurations']['kafka-env']['kafka_user_nofile_limit']
- kafka_user_nproc_limit = config['configurations']['kafka-env']['kafka_user_nproc_limit']
- # parameters for 2.2+
- if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted):
- kafka_home = os.path.join(stack_root, "current", "kafka-broker")
- kafka_bin = os.path.join(kafka_home, "bin", "kafka")
- conf_dir = os.path.join(kafka_home, "config")
- kafka_user = config['configurations']['kafka-env']['kafka_user']
- kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir']
- kafka_pid_dir = status_params.kafka_pid_dir
- kafka_pid_file = kafka_pid_dir+"/kafka.pid"
- # This is hardcoded on the kafka bash process lifecycle on which we have no control over
- kafka_managed_pid_dir = "/var/run/kafka"
- kafka_managed_log_dir = "/var/log/kafka"
- user_group = config['configurations']['cluster-env']['user_group']
- java64_home = config['hostLevelParams']['java_home']
- kafka_env_sh_template = config['configurations']['kafka-env']['content']
- kafka_jaas_conf_template = default("/configurations/kafka_jaas_conf/content", None)
- kafka_client_jaas_conf_template = default("/configurations/kafka_client_jaas_conf/content", None)
- kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts']
- kafka_hosts.sort()
- zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts']
- zookeeper_hosts.sort()
- secure_acls = default("/configurations/kafka-broker/zookeeper.set.acl", False)
- kafka_security_migrator = os.path.join(kafka_home, "bin", "zookeeper-security-migration.sh")
- #Kafka log4j
- kafka_log_maxfilesize = default('/configurations/kafka-log4j/kafka_log_maxfilesize',256)
- kafka_log_maxbackupindex = default('/configurations/kafka-log4j/kafka_log_maxbackupindex',20)
- controller_log_maxfilesize = default('/configurations/kafka-log4j/controller_log_maxfilesize',256)
- controller_log_maxbackupindex = default('/configurations/kafka-log4j/controller_log_maxbackupindex',20)
- if (('kafka-log4j' in config['configurations']) and ('content' in config['configurations']['kafka-log4j'])):
- log4j_props = config['configurations']['kafka-log4j']['content']
- else:
- log4j_props = None
- if 'ganglia_server_host' in config['clusterHostInfo'] and \
- len(config['clusterHostInfo']['ganglia_server_host'])>0:
- ganglia_installed = True
- ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
- ganglia_report_interval = 60
- else:
- ganglia_installed = False
- metric_collector_port = ""
- metric_collector_protocol = ""
- metric_truststore_path= default("/configurations/ams-ssl-client/ssl.client.truststore.location", "")
- metric_truststore_type= default("/configurations/ams-ssl-client/ssl.client.truststore.type", "")
- metric_truststore_password= default("/configurations/ams-ssl-client/ssl.client.truststore.password", "")
- ams_collector_hosts = ",".join(default("/clusterHostInfo/metrics_collector_hosts", []))
- has_metric_collector = not len(ams_collector_hosts) == 0
- if has_metric_collector:
- if 'cluster-env' in config['configurations'] and \
- 'metrics_collector_vip_port' in config['configurations']['cluster-env']:
- metric_collector_port = config['configurations']['cluster-env']['metrics_collector_vip_port']
- else:
- metric_collector_web_address = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:6188")
- if metric_collector_web_address.find(':') != -1:
- metric_collector_port = metric_collector_web_address.split(':')[1]
- else:
- metric_collector_port = '6188'
- if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY":
- metric_collector_protocol = 'https'
- else:
- metric_collector_protocol = 'http'
- pass
- # Security-related params
- security_enabled = config['configurations']['cluster-env']['security_enabled']
- kafka_kerberos_enabled = (('security.inter.broker.protocol' in config['configurations']['kafka-broker']) and
- ((config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL") or
- (config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "SASL_PLAINTEXT")))
- if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \
- and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted):
- _hostname_lowercase = config['hostname'].lower()
- _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name']
- kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase)
- kafka_keytab_path = config['configurations']['kafka-env']['kafka_keytab']
- kafka_bare_jaas_principal = get_bare_principal(_kafka_principal_name)
- kafka_kerberos_params = "-Djava.security.auth.login.config="+ conf_dir +"/kafka_jaas.conf"
- else:
- kafka_kerberos_params = ''
- kafka_jaas_principal = None
- kafka_keytab_path = None
- # for curl command in ranger plugin to get db connector
- jdk_location = config['hostLevelParams']['jdk_location']
- # ranger kafka plugin section start
- # ranger host
- ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])
- has_ranger_admin = not len(ranger_admin_hosts) == 0
- # ranger support xml_configuration flag, instead of depending on ranger xml_configurations_supported/ranger-env, using stack feature
- xml_configurations_supported = check_stack_feature(StackFeature.RANGER_XML_CONFIGURATION, version_for_stack_feature_checks)
- # ambari-server hostname
- ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0]
- ranger_admin_log_dir = default("/configurations/ranger-env/ranger_admin_log_dir","/var/log/ranger/admin")
- # ranger kafka plugin enabled property
- enable_ranger_kafka = default("configurations/ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled", "No")
- enable_ranger_kafka = True if enable_ranger_kafka.lower() == 'yes' else False
- # ranger kafka-plugin supported flag, instead of dependending on is_supported_kafka_ranger/kafka-env.xml, using stack feature
- is_supported_kafka_ranger = check_stack_feature(StackFeature.KAFKA_RANGER_PLUGIN_SUPPORT, version_for_stack_feature_checks)
- # ranger kafka properties
- if enable_ranger_kafka and is_supported_kafka_ranger:
- # get ranger policy url
- policymgr_mgr_url = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.policy.rest.url']
- if not is_empty(policymgr_mgr_url) and policymgr_mgr_url.endswith('/'):
- policymgr_mgr_url = policymgr_mgr_url.rstrip('/')
- # ranger audit db user
- xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger')
- xa_audit_db_password = ''
- if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db and has_ranger_admin:
- xa_audit_db_password = config['configurations']['admin-properties']['audit_db_password']
- # ranger kafka service/repository name
- repo_name = str(config['clusterName']) + '_kafka'
- repo_name_value = config['configurations']['ranger-kafka-security']['ranger.plugin.kafka.service.name']
- if not is_empty(repo_name_value) and repo_name_value != "{{repo_name}}":
- repo_name = repo_name_value
- ranger_env = config['configurations']['ranger-env']
- # create ranger-env config having external ranger credential properties
- if not has_ranger_admin and enable_ranger_kafka:
- external_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_admin_username', 'admin')
- external_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_admin_password', 'admin')
- external_ranger_admin_username = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_username', 'amb_ranger_admin')
- external_ranger_admin_password = default('/configurations/ranger-kafka-plugin-properties/external_ranger_admin_password', 'amb_ranger_admin')
- ranger_env = {}
- ranger_env['admin_username'] = external_admin_username
- ranger_env['admin_password'] = external_admin_password
- ranger_env['ranger_admin_username'] = external_ranger_admin_username
- ranger_env['ranger_admin_password'] = external_ranger_admin_password
- ranger_plugin_properties = config['configurations']['ranger-kafka-plugin-properties']
- ranger_kafka_audit = config['configurations']['ranger-kafka-audit']
- ranger_kafka_audit_attrs = config['configuration_attributes']['ranger-kafka-audit']
- ranger_kafka_security = config['configurations']['ranger-kafka-security']
- ranger_kafka_security_attrs = config['configuration_attributes']['ranger-kafka-security']
- ranger_kafka_policymgr_ssl = config['configurations']['ranger-kafka-policymgr-ssl']
- ranger_kafka_policymgr_ssl_attrs = config['configuration_attributes']['ranger-kafka-policymgr-ssl']
- policy_user = config['configurations']['ranger-kafka-plugin-properties']['policy_user']
- ranger_plugin_config = {
- 'username' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_USERNAME'],
- 'password' : config['configurations']['ranger-kafka-plugin-properties']['REPOSITORY_CONFIG_PASSWORD'],
- 'zookeeper.connect' : config['configurations']['ranger-kafka-plugin-properties']['zookeeper.connect'],
- 'commonNameForCertificate' : config['configurations']['ranger-kafka-plugin-properties']['common.name.for.certificate']
- }
- kafka_ranger_plugin_repo = {
- 'isEnabled': 'true',
- 'configs': ranger_plugin_config,
- 'description': 'kafka repo',
- 'name': repo_name,
- 'repositoryType': 'kafka',
- 'type': 'kafka',
- 'assetType': '1'
- }
- if stack_supports_ranger_kerberos and security_enabled:
- ranger_plugin_config['policy.download.auth.users'] = kafka_user
- ranger_plugin_config['tag.download.auth.users'] = kafka_user
- ranger_plugin_config['ambari.service.check.user'] = policy_user
- downloaded_custom_connector = None
- previous_jdbc_jar_name = None
- driver_curl_source = None
- driver_curl_target = None
- previous_jdbc_jar = None
- if has_ranger_admin and stack_supports_ranger_audit_db:
- xa_audit_db_flavor = config['configurations']['admin-properties']['DB_FLAVOR']
- jdbc_jar_name, previous_jdbc_jar_name, audit_jdbc_url, jdbc_driver = get_audit_configs(config)
- downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
- driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
- driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None
- previous_jdbc_jar = format("{kafka_home}/libs/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None
- xa_audit_db_is_enabled = False
- if xml_configurations_supported and stack_supports_ranger_audit_db:
- xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db']
- xa_audit_hdfs_is_enabled = default('/configurations/ranger-kafka-audit/xasecure.audit.destination.hdfs', False)
- ssl_keystore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password'] if xml_configurations_supported else None
- ssl_truststore_password = config['configurations']['ranger-kafka-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password'] if xml_configurations_supported else None
- credential_file = format('/etc/ranger/{repo_name}/cred.jceks')
- stack_version = get_stack_version('kafka-broker')
- setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh')
- setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh")
- # for SQLA explicitly disable audit to DB for Ranger
- if has_ranger_admin and stack_supports_ranger_audit_db and xa_audit_db_flavor.lower() == 'sqla':
- xa_audit_db_is_enabled = False
- # ranger kafka plugin section end
- namenode_hosts = default("/clusterHostInfo/namenode_host", [])
- has_namenode = not len(namenode_hosts) == 0
- hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] if has_namenode else None
- hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] if has_namenode else None
- hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] if has_namenode else None
- hdfs_site = config['configurations']['hdfs-site'] if has_namenode else None
- default_fs = config['configurations']['core-site']['fs.defaultFS'] if has_namenode else None
- hadoop_bin_dir = stack_select.get_hadoop_dir("bin") if has_namenode else None
- hadoop_conf_dir = conf_select.get_hadoop_conf_dir() if has_namenode else None
- kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
- import functools
- #create partial functions with common arguments for every HdfsResource call
- #to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code
- HdfsResource = functools.partial(
- HdfsResource,
- user=hdfs_user,
- hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
- security_enabled = security_enabled,
- keytab = hdfs_user_keytab,
- kinit_path_local = kinit_path_local,
- hadoop_bin_dir = hadoop_bin_dir,
- hadoop_conf_dir = hadoop_conf_dir,
- principal_name = hdfs_principal_name,
- hdfs_site = hdfs_site,
- default_fs = default_fs,
- immutable_paths = get_not_managed_resources()
- )
|