|
@@ -111,11 +111,36 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
"YARN": {"yarn-site": self.validateYarnConfigurations},
|
|
|
"RANGER": {"ranger-tagsync-site": self.validateRangerTagsyncConfigurations},
|
|
|
"SPARK2": {"spark2-defaults": self.validateSpark2Defaults,
|
|
|
- "spark2-thrift-sparkconf": self.validateSpark2ThriftSparkConf}
|
|
|
+ "spark2-thrift-sparkconf": self.validateSpark2ThriftSparkConf},
|
|
|
+ "STORM": {"storm-site": self.validateStormConfigurations},
|
|
|
}
|
|
|
self.mergeValidators(parentValidators, childValidators)
|
|
|
return parentValidators
|
|
|
|
|
|
+ def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
|
|
|
+ super(HDP25StackAdvisor, self).validateStormConfigurations(properties, recommendedDefaults, configurations, services, hosts)
|
|
|
+ validationItems = []
|
|
|
+
|
|
|
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
|
|
|
+ # Storm AMS integration
|
|
|
+ if 'AMBARI_METRICS' in servicesList:
|
|
|
+ if "storm.cluster.metrics.consumer.register" in properties and \
|
|
|
+ 'null' in properties.get("storm.cluster.metrics.consumer.register"):
|
|
|
+
|
|
|
+ validationItems.append({"config-name": 'storm.cluster.metrics.consumer.register',
|
|
|
+ "item": self.getWarnItem(
|
|
|
+ "Should be set to recommended value to report metrics to Ambari Metrics service.")})
|
|
|
+
|
|
|
+ if "topology.metrics.consumer.register" in properties and \
|
|
|
+ 'null' in properties.get("topology.metrics.consumer.register"):
|
|
|
+
|
|
|
+ validationItems.append({"config-name": 'topology.metrics.consumer.register',
|
|
|
+ "item": self.getWarnItem(
|
|
|
+ "Should be set to recommended value to report metrics to Ambari Metrics service.")})
|
|
|
+
|
|
|
+
|
|
|
+ return self.toConfigurationValidationProblems(validationItems, "storm-site")
|
|
|
+
|
|
|
def validateAtlasConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
|
|
|
application_properties = getSiteProperties(configurations, "application-properties")
|
|
|
validationItems = []
|
|
@@ -449,6 +474,21 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
|
|
|
else:
|
|
|
putStormSiteAttributes('nimbus.authorizer', 'delete', 'true')
|
|
|
|
|
|
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
|
|
|
+ # Storm AMS integration
|
|
|
+ if 'AMBARI_METRICS' in servicesList:
|
|
|
+ putStormSiteProperty('storm.cluster.metrics.consumer.register', '[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"}]')
|
|
|
+ putStormSiteProperty('topology.metrics.consumer.register',
|
|
|
+ '[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", '
|
|
|
+ '"parallelism.hint": 1, '
|
|
|
+ '"whitelist": ["kafkaOffset\\\..+/", "__complete-latency", "__process-latency", '
|
|
|
+ '"__receive\\\.population$", "__sendqueue\\\.population$", "__execute-count", "__emit-count", '
|
|
|
+ '"__ack-count", "__fail-count", "memory/heap\\\.usedBytes$", "memory/nonHeap\\\.usedBytes$", '
|
|
|
+ '"GC/.+\\\.count$", "GC/.+\\\.timeMs$"]}]')
|
|
|
+ else:
|
|
|
+ putStormSiteProperty('storm.cluster.metrics.consumer.register', 'null')
|
|
|
+ putStormSiteProperty('topology.metrics.consumer.register', 'null')
|
|
|
+
|
|
|
def constructAtlasRestAddress(self, services, hosts):
|
|
|
"""
|
|
|
:param services: Collection of services in the cluster with configs
|