|
@@ -204,8 +204,163 @@ class HDP206StackAdvisor(StackAdvisor):
|
|
|
pass
|
|
|
|
|
|
def recommendConfigurations(self, services, hosts):
|
|
|
- """Returns Services object with configurations object populated"""
|
|
|
- pass
|
|
|
+ stackName = services["Versions"]["stack_name"]
|
|
|
+ stackVersion = services["Versions"]["stack_version"]
|
|
|
+ hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]]
|
|
|
+ servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
|
|
|
+ components = [component["StackServiceComponents"]["component_name"]
|
|
|
+ for service in services["services"]
|
|
|
+ for component in service["components"]]
|
|
|
+
|
|
|
+ clusterData = self.getClusterData(servicesList, hosts, components)
|
|
|
+
|
|
|
+ recommendations = {
|
|
|
+ "Versions": {"stack_name": stackName, "stack_version": stackVersion},
|
|
|
+ "hosts": hostsList,
|
|
|
+ "services": servicesList,
|
|
|
+ "recommendations": {
|
|
|
+ "blueprint": {
|
|
|
+ "configurations": {},
|
|
|
+ "host_groups": []
|
|
|
+ },
|
|
|
+ "blueprint_cluster_binding": {
|
|
|
+ "host_groups": []
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ configurations = recommendations["recommendations"]["blueprint"]["configurations"]
|
|
|
+
|
|
|
+ for service in servicesList:
|
|
|
+ calculation = self.recommendServiceConfigurations(service)
|
|
|
+ if calculation is not None:
|
|
|
+ calculation(configurations, clusterData)
|
|
|
+
|
|
|
+ return recommendations
|
|
|
+
|
|
|
+ def recommendServiceConfigurations(self, service):
|
|
|
+ return {
|
|
|
+ "YARN": self.recommendYARNConfigurations,
|
|
|
+ "MAPREDUCE2": self.recommendMapReduce2Configurations,
|
|
|
+ "HIVE": self.recommendHiveConfigurations,
|
|
|
+ "OOZIE": self.recommendOozieConfigurations
|
|
|
+ }.get(service, None)
|
|
|
+
|
|
|
+ def putProperty(self, config, configType):
|
|
|
+ config[configType] = {"properties": {}}
|
|
|
+ def appendProperty(key, value):
|
|
|
+ config[configType]["properties"][key] = str(value)
|
|
|
+ return appendProperty
|
|
|
+
|
|
|
+ def recommendYARNConfigurations(self, configurations, clusterData):
|
|
|
+ putYarnProperty = self.putProperty(configurations, "yarn-site")
|
|
|
+ putYarnProperty('yarn.nodemanager.resource.memory-mb', clusterData['containers'] * clusterData['ramPerContainer'])
|
|
|
+ putYarnProperty('yarn.scheduler.minimum-allocation-mb', clusterData['ramPerContainer'])
|
|
|
+ putYarnProperty('yarn.scheduler.maximum-allocation-mb', clusterData['containers'] * clusterData['ramPerContainer'])
|
|
|
+
|
|
|
+ def recommendHiveConfigurations(self, configurations, clusterData):
|
|
|
+ containerSize = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else clusterData['reduceMemory']
|
|
|
+ containerSize = min(clusterData['containers'] * clusterData['ramPerContainer'], containerSize)
|
|
|
+ putHiveProperty = self.putProperty(configurations, "hive-site")
|
|
|
+ putHiveProperty('hive.auto.convert.join.noconditionaltask.size', int(containerSize / 3) * 1048576)
|
|
|
+ putHiveProperty('hive.tez.java.opts', "-server -Xmx" + str(int(0.8 * containerSize))
|
|
|
+ + "m -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC")
|
|
|
+ putHiveProperty('hive.tez.container.size', containerSize)
|
|
|
+
|
|
|
+ def recommendMapReduce2Configurations(self, configurations, clusterData):
|
|
|
+ putMapredProperty = self.putProperty(configurations, "mapred-site")
|
|
|
+ putMapredProperty('yarn.app.mapreduce.am.resource.mb', clusterData['amMemory'])
|
|
|
+ putMapredProperty('yarn.app.mapreduce.am.command-opts', "-Xmx" + str(int(0.8 * clusterData['amMemory'])) + "m")
|
|
|
+ putMapredProperty('mapreduce.map.memory.mb', clusterData['mapMemory'])
|
|
|
+ putMapredProperty('mapreduce.reduce.memory.mb', clusterData['reduceMemory'])
|
|
|
+ putMapredProperty('mapreduce.map.java.opts', "-Xmx" + str(int(0.8 * clusterData['mapMemory'])) + "m")
|
|
|
+ putMapredProperty('mapreduce.reduce.java.opts', "-Xmx" + str(int(0.8 * clusterData['reduceMemory'])) + "m")
|
|
|
+ putMapredProperty('mapreduce.task.io.sort.mb', int(min(0.4 * clusterData['mapMemory'], 1024)))
|
|
|
+
|
|
|
+ def recommendOozieConfigurations(self, configurations, clusterData):
|
|
|
+ if "FALCON_SERVER" in clusterData["components"]:
|
|
|
+ putMapredProperty = self.putProperty(configurations, "oozie-site")
|
|
|
+ putMapredProperty("oozie.services.ext",
|
|
|
+ "org.apache.oozie.service.JMSAccessorService," +
|
|
|
+ "org.apache.oozie.service.PartitionDependencyManagerService," +
|
|
|
+ "org.apache.oozie.service.HCatAccessorService")
|
|
|
+
|
|
|
+ def getClusterData(self, servicesList, hosts, components):
|
|
|
+
|
|
|
+ hBaseInstalled = False
|
|
|
+ if 'HBASE' in servicesList:
|
|
|
+ hBaseInstalled = True
|
|
|
+
|
|
|
+ cluster = {
|
|
|
+ "cpu": 0,
|
|
|
+ "disk": 0,
|
|
|
+ "ram": 0,
|
|
|
+ "hBaseInstalled": hBaseInstalled,
|
|
|
+ "components": components
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(hosts["items"]) > 0:
|
|
|
+ host = hosts["items"][0]["Hosts"]
|
|
|
+ cluster["cpu"] = host["cpu_count"]
|
|
|
+ cluster["disk"] = len(host["disk_info"])
|
|
|
+ cluster["ram"] = int(host["total_mem"] / (1024 * 1024))
|
|
|
+
|
|
|
+ ramRecommendations = [
|
|
|
+ {"os":1, "hbase":1},
|
|
|
+ {"os":2, "hbase":1},
|
|
|
+ {"os":2, "hbase":2},
|
|
|
+ {"os":4, "hbase":4},
|
|
|
+ {"os":6, "hbase":8},
|
|
|
+ {"os":8, "hbase":8},
|
|
|
+ {"os":8, "hbase":8},
|
|
|
+ {"os":12, "hbase":16},
|
|
|
+ {"os":24, "hbase":24},
|
|
|
+ {"os":32, "hbase":32},
|
|
|
+ {"os":64, "hbase":64}
|
|
|
+ ]
|
|
|
+ index = {
|
|
|
+ cluster["ram"] <= 4: 0,
|
|
|
+ 4 < cluster["ram"] <= 8: 1,
|
|
|
+ 8 < cluster["ram"] <= 16: 2,
|
|
|
+ 16 < cluster["ram"] <= 24: 3,
|
|
|
+ 24 < cluster["ram"] <= 48: 4,
|
|
|
+ 48 < cluster["ram"] <= 64: 5,
|
|
|
+ 64 < cluster["ram"] <= 72: 6,
|
|
|
+ 72 < cluster["ram"] <= 96: 7,
|
|
|
+ 96 < cluster["ram"] <= 128: 8,
|
|
|
+ 128 < cluster["ram"] <= 256: 9,
|
|
|
+ 256 < cluster["ram"]: 10
|
|
|
+ }[1]
|
|
|
+ cluster["reservedRam"] = ramRecommendations[index]["os"]
|
|
|
+ cluster["hbaseRam"] = ramRecommendations[index]["hbase"]
|
|
|
+
|
|
|
+ cluster["minContainerSize"] = {
|
|
|
+ cluster["ram"] <= 4: 256,
|
|
|
+ 4 < cluster["ram"] <= 8: 512,
|
|
|
+ 8 < cluster["ram"] <= 24: 1024,
|
|
|
+ 24 < cluster["ram"]: 2048
|
|
|
+ }[1]
|
|
|
+
|
|
|
+ '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))'''
|
|
|
+ cluster["containers"] = max(3,
|
|
|
+ min(2 * cluster["cpu"],
|
|
|
+ int(min(1.8 * cluster["disk"],
|
|
|
+ cluster["ram"] / cluster["minContainerSize"]))))
|
|
|
+
|
|
|
+ '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers'''
|
|
|
+ cluster["ramPerContainer"] = max(2048,
|
|
|
+ cluster["ram"] - cluster["reservedRam"] - cluster["hbaseRam"])
|
|
|
+ cluster["ramPerContainer"] /= cluster["containers"]
|
|
|
+ '''If greater than 1GB, value will be in multiples of 512.'''
|
|
|
+ if cluster["ramPerContainer"] > 1024:
|
|
|
+ cluster["ramPerContainer"] = ceil(cluster["ramPerContainer"] / 512) * 512
|
|
|
+
|
|
|
+ cluster["mapMemory"] = int(cluster["ramPerContainer"])
|
|
|
+ cluster["reduceMemory"] = cluster["ramPerContainer"]
|
|
|
+ cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"])
|
|
|
+
|
|
|
+ return cluster
|
|
|
+
|
|
|
|
|
|
def validateConfigurations(self, services, hosts):
|
|
|
"""Returns array of Validation objects about issues with configuration values provided in services"""
|