|
@@ -293,27 +293,85 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
|
|
putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true')
|
|
putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true')
|
|
|
|
|
|
def recommendKAFKAConfigurations(self, configurations, clusterData, services, hosts):
|
|
def recommendKAFKAConfigurations(self, configurations, clusterData, services, hosts):
|
|
|
|
+
|
|
|
|
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
|
|
kafka_broker = getServicesSiteProperties(services, "kafka-broker")
|
|
kafka_broker = getServicesSiteProperties(services, "kafka-broker")
|
|
|
|
|
|
- # kerberos security for kafka is decided from `security.inter.broker.protocol` property value
|
|
|
|
- security_enabled = (kafka_broker is not None and 'security.inter.broker.protocol' in kafka_broker
|
|
|
|
- and 'SASL' in kafka_broker['security.inter.broker.protocol'])
|
|
|
|
|
|
+ security_enabled = isSecurityEnabled(services)
|
|
|
|
+
|
|
putKafkaBrokerProperty = self.putProperty(configurations, "kafka-broker", services)
|
|
putKafkaBrokerProperty = self.putProperty(configurations, "kafka-broker", services)
|
|
putKafkaLog4jProperty = self.putProperty(configurations, "kafka-log4j", services)
|
|
putKafkaLog4jProperty = self.putProperty(configurations, "kafka-log4j", services)
|
|
putKafkaBrokerAttributes = self.putPropertyAttribute(configurations, "kafka-broker")
|
|
putKafkaBrokerAttributes = self.putPropertyAttribute(configurations, "kafka-broker")
|
|
|
|
|
|
|
|
+ if security_enabled:
|
|
|
|
+ kafka_env = getServicesSiteProperties(services, "kafka-env")
|
|
|
|
+ kafka_user = kafka_env.get('kafka_user') if kafka_env is not None else None
|
|
|
|
+
|
|
|
|
+ if kafka_user is not None:
|
|
|
|
+ kafka_super_users = kafka_broker.get('super.users') if kafka_broker is not None else None
|
|
|
|
+
|
|
|
|
+ # kafka_super_super_users is expected to be formatted as: User:user1;User:user2
|
|
|
|
+ if kafka_super_users is not None and kafka_super_users != '':
|
|
|
|
+ # Parse kafka_super_users to get a set of unique user names and rebuild the property value
|
|
|
|
+ user_names = set()
|
|
|
|
+ user_names.add(kafka_user)
|
|
|
|
+ for match in re.findall('User:([^;]*)', kafka_super_users):
|
|
|
|
+ user_names.add(match)
|
|
|
|
+ kafka_super_users = 'User:' + ";User:".join(user_names)
|
|
|
|
+ else:
|
|
|
|
+ kafka_super_users = 'User:' + kafka_user
|
|
|
|
+
|
|
|
|
+ putKafkaBrokerProperty("super.users", kafka_super_users)
|
|
|
|
+
|
|
|
|
+ putKafkaBrokerProperty("principal.to.local.class", "kafka.security.auth.KerberosPrincipalToLocal")
|
|
|
|
+ putKafkaBrokerProperty("security.inter.broker.protocol", "PLAINTEXTSASL")
|
|
|
|
+ putKafkaBrokerProperty("zookeeper.set.acl", "true")
|
|
|
|
+
|
|
|
|
+ else: # not security_enabled
|
|
|
|
+ # remove unneeded properties
|
|
|
|
+ putKafkaBrokerAttributes('super.users', 'delete', 'true')
|
|
|
|
+ putKafkaBrokerAttributes('principal.to.local.class', 'delete', 'true')
|
|
|
|
+ putKafkaBrokerAttributes('security.inter.broker.protocol', 'delete', 'true')
|
|
|
|
+
|
|
|
|
+ # Update ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled to match ranger-env/ranger-kafka-plugin-enabled
|
|
|
|
+ if "ranger-env" in services["configurations"] \
|
|
|
|
+ and "ranger-kafka-plugin-properties" in services["configurations"] \
|
|
|
|
+ and "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
|
|
|
|
+ putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties", services)
|
|
|
|
+ ranger_kafka_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"]
|
|
|
|
+ putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", ranger_kafka_plugin_enabled)
|
|
|
|
+
|
|
|
|
+ # Determine if the Ranger/Kafka Plugin is enabled
|
|
|
|
+ ranger_plugin_enabled = "RANGER" in servicesList
|
|
|
|
+ # Only if the RANGER service is installed....
|
|
|
|
+ if ranger_plugin_enabled:
|
|
|
|
+ # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled,
|
|
|
|
+ # determine if the Ranger/Kafka plug-in enabled enabled or not
|
|
|
|
+ if 'ranger-kafka-plugin-properties' in configurations and \
|
|
|
|
+ 'ranger-kafka-plugin-enabled' in configurations['ranger-kafka-plugin-properties']['properties']:
|
|
|
|
+ ranger_plugin_enabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower() == 'yes'
|
|
|
|
+ # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled was not changed,
|
|
|
|
+ # determine if the Ranger/Kafka plug-in enabled enabled or not
|
|
|
|
+ elif 'ranger-kafka-plugin-properties' in services['configurations'] and \
|
|
|
|
+ 'ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']:
|
|
|
|
+ ranger_plugin_enabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower() == 'yes'
|
|
|
|
+
|
|
|
|
+ # Determine the value for kafka-broker/authorizer.class.name
|
|
|
|
+ if ranger_plugin_enabled:
|
|
|
|
+ # If the Ranger plugin for Kafka is enabled, set authorizer.class.name to
|
|
|
|
+ # "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" whether Kerberos is
|
|
|
|
+ # enabled or not.
|
|
|
|
+ putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer')
|
|
|
|
+ elif security_enabled:
|
|
|
|
+ putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer')
|
|
|
|
+ else:
|
|
|
|
+ putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
|
|
|
|
+
|
|
#If AMS is part of Services, use the KafkaTimelineMetricsReporter for metric reporting. Default is ''.
|
|
#If AMS is part of Services, use the KafkaTimelineMetricsReporter for metric reporting. Default is ''.
|
|
- servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
|
|
|
|
if "AMBARI_METRICS" in servicesList:
|
|
if "AMBARI_METRICS" in servicesList:
|
|
putKafkaBrokerProperty('kafka.metrics.reporters', 'org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter')
|
|
putKafkaBrokerProperty('kafka.metrics.reporters', 'org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter')
|
|
|
|
|
|
- if "ranger-env" in services["configurations"] and "ranger-kafka-plugin-properties" in services["configurations"] and \
|
|
|
|
- "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]:
|
|
|
|
- putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties", services)
|
|
|
|
- rangerEnvKafkaPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"]
|
|
|
|
- putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", rangerEnvKafkaPluginProperty)
|
|
|
|
-
|
|
|
|
- if 'ranger-kafka-plugin-properties' in services['configurations'] and ('ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']):
|
|
|
|
|
|
+ if ranger_plugin_enabled:
|
|
kafkaLog4jRangerLines = [{
|
|
kafkaLog4jRangerLines = [{
|
|
"name": "log4j.appender.rangerAppender",
|
|
"name": "log4j.appender.rangerAppender",
|
|
"value": "org.apache.log4j.DailyRollingFileAppender"
|
|
"value": "org.apache.log4j.DailyRollingFileAppender"
|
|
@@ -339,37 +397,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
|
|
"value": "INFO, rangerAppender"
|
|
"value": "INFO, rangerAppender"
|
|
}]
|
|
}]
|
|
|
|
|
|
- rangerPluginEnabled=''
|
|
|
|
- if 'ranger-kafka-plugin-properties' in configurations and 'ranger-kafka-plugin-enabled' in configurations['ranger-kafka-plugin-properties']['properties']:
|
|
|
|
- rangerPluginEnabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled']
|
|
|
|
- elif 'ranger-kafka-plugin-properties' in services['configurations'] and 'ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']:
|
|
|
|
- rangerPluginEnabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled']
|
|
|
|
-
|
|
|
|
- if rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower():
|
|
|
|
- # recommend authorizer.class.name
|
|
|
|
- putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer')
|
|
|
|
- # change kafka-log4j when ranger plugin is installed
|
|
|
|
-
|
|
|
|
- if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']:
|
|
|
|
- kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content']
|
|
|
|
- for item in range(len(kafkaLog4jRangerLines)):
|
|
|
|
- if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent:
|
|
|
|
- kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"]
|
|
|
|
- putKafkaLog4jProperty("content",kafkaLog4jContent)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- else:
|
|
|
|
- # Kerberized Cluster with Ranger plugin disabled
|
|
|
|
- if security_enabled and 'kafka-broker' in services['configurations'] and 'authorizer.class.name' in services['configurations']['kafka-broker']['properties'] and \
|
|
|
|
- services['configurations']['kafka-broker']['properties']['authorizer.class.name'] == 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer':
|
|
|
|
- putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer')
|
|
|
|
- # Non-kerberos Cluster with Ranger plugin disabled
|
|
|
|
- else:
|
|
|
|
- putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
|
|
|
|
-
|
|
|
|
- # Non-Kerberos Cluster without Ranger
|
|
|
|
- elif not security_enabled:
|
|
|
|
- putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
|
|
|
|
|
|
+ # change kafka-log4j when ranger plugin is installed
|
|
|
|
+ if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']:
|
|
|
|
+ kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content']
|
|
|
|
+ for item in range(len(kafkaLog4jRangerLines)):
|
|
|
|
+ if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent:
|
|
|
|
+ kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"]
|
|
|
|
+ putKafkaLog4jProperty("content",kafkaLog4jContent)
|
|
|
|
|
|
def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts):
|
|
def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts):
|
|
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
|
|
servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
|