service_advisor.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. #!/usr/bin/env ambari-python-wrap
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. """
  17. import os
  18. import imp
  19. import re
  20. import socket
  21. import traceback
  22. import math
  23. SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
  24. STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/')
  25. PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py')
  26. try:
  27. with open(PARENT_FILE, 'rb') as fp:
  28. service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
  29. except Exception as e:
  30. traceback.print_exc()
  31. print "Failed to load parent"
  32. class HAWQ200ServiceAdvisor(service_advisor.ServiceAdvisor):
  33. def getHostsForMasterComponent(self, stackAdvisor, services, hosts, component, hostsList, hostsComponentsMap):
  34. if component["StackServiceComponents"]["component_name"] == 'HAWQSTANDBY':
  35. # Do not recommend HAWQSTANDBY on single node cluster, or cluster with no active hosts
  36. if len(hostsList) <= 1:
  37. return []
  38. componentsListList = [service["components"] for service in services["services"]]
  39. componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
  40. hawqMasterHosts = self.getHosts(componentsList, "HAWQMASTER")
  41. hawqStandbyHosts = self.getHosts(componentsList, "HAWQSTANDBY")
  42. # if HAWQMASTER has already been assigned and HAWQSTANDBY has not been assigned, try to ensure HAWQSTANDBY is not placed on the same host
  43. if len(hawqMasterHosts) > 0 and len(hawqStandbyHosts) == 0:
  44. ambariServerHost = socket.getfqdn()
  45. availableHosts = [host for host in hostsList if host not in (hawqMasterHosts[0], ambariServerHost)]
  46. # Return list containing first available host if there are available hosts
  47. if len(availableHosts) > 0:
  48. return availableHosts[:1]
  49. return [ambariServerHost]
  50. return stackAdvisor.getHostsForMasterComponent(services, hosts, component, hostsList, hostsComponentsMap)
  51. def isComponentNotPreferableOnAmbariServerHost(self, componentName):
  52. return componentName in ('HAWQMASTER', 'HAWQSTANDBY')
  53. def getComponentLayoutScheme(self, componentName):
  54. if componentName == 'HAWQMASTER':
  55. return {6: 2, 31: 1, "else": 5}
  56. if componentName == 'HAWQSTANDBY':
  57. return {6: 1, 31: 2, "else": 3}
  58. return None
  59. def colocateService(self, stackAdvisor, hostsComponentsMap, serviceComponents):
  60. # colocate HAWQSEGMENT with DATANODE, if no hosts have been allocated for HAWQSEGMENT
  61. hawqSegment = [component for component in serviceComponents if component["StackServiceComponents"]["component_name"] == "HAWQSEGMENT"][0]
  62. if not stackAdvisor.isComponentHostsPopulated(hawqSegment):
  63. for hostName in hostsComponentsMap.keys():
  64. hostComponents = hostsComponentsMap[hostName]
  65. if {"name": "DATANODE"} in hostComponents and {"name": "HAWQSEGMENT"} not in hostComponents:
  66. hostsComponentsMap[hostName].append( { "name": "HAWQSEGMENT" } )
  67. if {"name": "DATANODE"} not in hostComponents and {"name": "HAWQSEGMENT"} in hostComponents:
  68. hostComponents.remove({"name": "HAWQSEGMENT"})
  69. def getComponentLayoutValidations(self, stackAdvisor, services, hosts):
  70. componentsListList = [service["components"] for service in services["services"]]
  71. componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
  72. hostsList = [host["Hosts"]["host_name"] for host in hosts["items"]]
  73. hostsCount = len(hostsList)
  74. hawqMasterHosts = self.getHosts(componentsList, "HAWQMASTER")
  75. hawqStandbyHosts = self.getHosts(componentsList, "HAWQSTANDBY")
  76. hawqSegmentHosts = self.getHosts(componentsList, "HAWQSEGMENT")
  77. datanodeHosts = self.getHosts(componentsList, "DATANODE")
  78. items = []
  79. # Generate WARNING if any HAWQSEGMENT is not colocated with a DATANODE
  80. mismatchHosts = sorted(set(hawqSegmentHosts).symmetric_difference(set(datanodeHosts)))
  81. if len(mismatchHosts) > 0:
  82. hostsString = ', '.join(mismatchHosts)
  83. message = "HAWQ Segment must be installed on all DataNodes. " \
  84. "The following {0} host(s) do not satisfy the colocation recommendation: {1}".format(len(mismatchHosts), hostsString)
  85. items.append( { "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'HAWQSEGMENT' } )
  86. # single node case is not analyzed because HAWQ Standby Master will not be present in single node topology due to logic in createComponentLayoutRecommendations()
  87. if len(hawqMasterHosts) == 1 and len(hawqStandbyHosts) == 1 and hawqMasterHosts == hawqStandbyHosts:
  88. message = "HAWQ Master and HAWQ Standby Master cannot be deployed on the same host."
  89. items.append( { "type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'HAWQSTANDBY', "host": hawqStandbyHosts[0] } )
  90. if len(hawqMasterHosts) == 1 and hostsCount > 1 and stackAdvisor.isLocalHost(hawqMasterHosts[0]):
  91. message = "The default Postgres port (5432) on the Ambari Server conflicts with the default HAWQ Masters port. " \
  92. "If you are using port 5432 for Postgres, you must either deploy the HAWQ Master on a different host " \
  93. "or configure a different port for the HAWQ Masters in the HAWQ Configuration page."
  94. items.append( { "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'HAWQMASTER', "host": hawqMasterHosts[0] } )
  95. if len(hawqStandbyHosts) == 1 and hostsCount > 1 and stackAdvisor.isLocalHost(hawqStandbyHosts[0]):
  96. message = "The default Postgres port (5432) on the Ambari Server conflicts with the default HAWQ Masters port. " \
  97. "If you are using port 5432 for Postgres, you must either deploy the HAWQ Standby Master on a different host " \
  98. "or configure a different port for the HAWQ Masters in the HAWQ Configuration page."
  99. items.append( { "type": 'host-component', "level": 'WARN', "message": message, "component-name": 'HAWQSTANDBY', "host": hawqStandbyHosts[0] } )
  100. return items
  101. def isHawqMasterComponentOnAmbariServer(self, stackAdvisor, services):
  102. componentsListList = [service["components"] for service in services["services"]]
  103. componentsList = [item for sublist in componentsListList for item in sublist]
  104. hawqMasterComponentHosts = [hostname for component in componentsList if component["StackServiceComponents"]["component_name"] in ("HAWQMASTER", "HAWQSTANDBY") for hostname in component["StackServiceComponents"]["hostnames"]]
  105. return any([stackAdvisor.isLocalHost(host) for host in hawqMasterComponentHosts])
  106. def getServiceConfigurationRecommendations(self, stackAdvisor, configurations, clusterData, services, hosts):
  107. putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services)
  108. # Set dfs.allow.truncate to true
  109. putHdfsSiteProperty('dfs.allow.truncate', 'true')
  110. if any(x in services["configurations"] for x in ["hawq-site", "hdfs-client", "hawq-sysctl-env"]):
  111. componentsListList = [service["components"] for service in services["services"]]
  112. componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
  113. servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
  114. hawqMasterHosts = set(self.getHosts(componentsList, "HAWQMASTER")).union(set(self.getHosts(componentsList, "HAWQSTANDBY")))
  115. hawqSegmentHosts = set(self.getHosts(componentsList, "HAWQSEGMENT"))
  116. hawqHosts = hawqMasterHosts.union(hawqSegmentHosts)
  117. numSegments = len(hawqSegmentHosts)
  118. minHawqHostsMemory = min([host['Hosts']['total_mem'] for host in hosts['items'] if host['Hosts']['host_name'] in hawqHosts])
  119. if "hawq-site" in services["configurations"]:
  120. hawq_site = services["configurations"]["hawq-site"]["properties"]
  121. putHawqSiteProperty = self.putProperty(configurations, "hawq-site", services)
  122. # remove master port when master is colocated with Ambari server
  123. if self.isHawqMasterComponentOnAmbariServer(stackAdvisor, services) and "hawq_master_address_port" in hawq_site:
  124. putHawqSiteProperty('hawq_master_address_port', '')
  125. # update query limits if segments are deployed
  126. if numSegments and "default_hash_table_bucket_number" in hawq_site and "hawq_rm_nvseg_perquery_limit" in hawq_site:
  127. factor_min = 1
  128. factor_max = 6
  129. limit = int(hawq_site["hawq_rm_nvseg_perquery_limit"])
  130. factor = limit / numSegments
  131. # if too many segments or default limit is too low --> stick with the limit
  132. if factor < factor_min:
  133. buckets = limit
  134. # if the limit is large and results in factor > max --> limit factor to max
  135. elif factor > factor_max:
  136. buckets = factor_max * numSegments
  137. else:
  138. buckets = factor * numSegments
  139. putHawqSiteProperty('default_hash_table_bucket_number', buckets)
  140. # update YARN RM urls with the values from yarn-site if YARN is installed
  141. if "YARN" in servicesList and "yarn-site" in services["configurations"]:
  142. yarn_site = services["configurations"]["yarn-site"]["properties"]
  143. for hs_prop, ys_prop in self.getHAWQYARNPropertyMapping().items():
  144. if hs_prop in hawq_site and ys_prop in yarn_site:
  145. putHawqSiteProperty(hs_prop, yarn_site[ys_prop])
  146. # set vm.overcommit_memory to 2 if the minimum memory among all hawqHosts is greater than 32GB
  147. if "hawq-sysctl-env" in services["configurations"] and "vm.overcommit_memory" in services["configurations"]["hawq-sysctl-env"]["properties"]:
  148. MEM_THRESHOLD = 33554432 # 32GB, minHawqHostsMemory is represented in kB
  149. vm_overcommit_mem_value = "2" if minHawqHostsMemory >= MEM_THRESHOLD else "1"
  150. putHawqSysctlEnvProperty = self.putProperty(configurations, "hawq-sysctl-env", services)
  151. putHawqSysctlEnvProperty("vm.overcommit_memory", vm_overcommit_mem_value)
  152. # Set the value for hawq_rm_memory_limit_perseg based on vm.overcommit value and RAM available on HAWQ Hosts
  153. # HAWQ Hosts with the minimum amount of RAM is considered for calculations
  154. # Available RAM Formula = SWAP + RAM * vm.overcommit_ratio / 100
  155. # Assumption: vm.overcommit_ratio = 50 (default on Linux), SWAP not considered for recommendation
  156. host_ram_kb = minHawqHostsMemory / 2 if vm_overcommit_mem_value == "2" else minHawqHostsMemory
  157. host_ram_gb = float(host_ram_kb) / (1024 * 1024)
  158. recommended_mem_percentage = {
  159. host_ram_gb <= 64: .75,
  160. 64 < host_ram_gb <= 512: .85,
  161. host_ram_gb > 512: .95
  162. }[True]
  163. recommended_mem = math.ceil(host_ram_gb * recommended_mem_percentage)
  164. unit = "GB"
  165. # If RAM on a host is very low ~ 2 GB, ceil function may result in making it equal to total mem,
  166. # in that case we recommend the value in MB not GB
  167. if recommended_mem >= host_ram_gb:
  168. recommended_mem = math.ceil(float(host_ram_kb)/1024 * recommended_mem_percentage)
  169. unit = "MB"
  170. # hawq_rm_memory_limit_perseg does not support decimal value so trim decimal using int
  171. putHawqSiteProperty("hawq_rm_memory_limit_perseg", "{0}{1}".format(int(recommended_mem), unit))
  172. # set output.replace-datanode-on-failure in HAWQ hdfs-client depending on the cluster size
  173. if "hdfs-client" in services["configurations"]:
  174. MIN_NUM_SEGMENT_THRESHOLD = 3
  175. hdfs_client = services["configurations"]["hdfs-client"]["properties"]
  176. if "output.replace-datanode-on-failure" in hdfs_client:
  177. propertyValue = "true" if numSegments > MIN_NUM_SEGMENT_THRESHOLD else "false"
  178. putHdfsClientProperty = self.putProperty(configurations, "hdfs-client", services)
  179. putHdfsClientProperty("output.replace-datanode-on-failure", propertyValue)
  180. def getHAWQYARNPropertyMapping(self):
  181. return { "hawq_rm_yarn_address": "yarn.resourcemanager.address", "hawq_rm_yarn_scheduler_address": "yarn.resourcemanager.scheduler.address" }
  182. def getConfigurationsValidationItems(self, stackAdvisor, configurations, recommendedDefaults, services, hosts):
  183. siteName = "hawq-site"
  184. method = self.validateHAWQSiteConfigurations
  185. items = self.validateConfigurationsForSite(stackAdvisor, configurations, recommendedDefaults, services, hosts, siteName, method)
  186. siteName = "hdfs-client"
  187. method = self.validateHAWQHdfsClientConfigurations
  188. resultItems = self.validateConfigurationsForSite(stackAdvisor, configurations, recommendedDefaults, services, hosts, siteName, method)
  189. items.extend(resultItems)
  190. return items
  191. def isHawqMasterPortConflict(self, configurations):
  192. prop_name = 'hawq_master_address_port'
  193. default_ambari_port = 5432
  194. if prop_name in configurations["hawq-site"]["properties"]:
  195. portValue = int(configurations["hawq-site"]["properties"][prop_name])
  196. return portValue == default_ambari_port
  197. return False
  198. def validateIfRootDir(self, properties, validationItems, prop_name, display_name):
  199. root_dir = '/'
  200. if prop_name in properties and properties[prop_name].strip() == root_dir:
  201. validationItems.append({"config-name": prop_name,
  202. "item": self.getWarnItem(
  203. "It is not advisable to have " + display_name + " at " + root_dir +". Consider creating a sub directory for HAWQ")})
  204. def checkForMultipleDirs(self, properties, validationItems, prop_name, display_name):
  205. # check for delimiters space, comma, colon and semi-colon
  206. if prop_name in properties and len(re.sub(r'[,;:]', ' ', properties[prop_name]).split(' ')) > 1:
  207. validationItems.append({"config-name": prop_name,
  208. "item": self.getErrorItem(
  209. "Multiple directories for " + display_name + " are not allowed.")})
  210. def validateHAWQSiteConfigurations(self, stackAdvisor, properties, recommendedDefaults, configurations, services, hosts):
  211. hawq_site = properties
  212. validationItems = []
  213. # 1. Check if HAWQ master/standby port numbers don't conflict with Ambari ports. Both Ambari and HAWQ use postgres DB and 5432 port.
  214. if self.isHawqMasterComponentOnAmbariServer(stackAdvisor, services) and self.isHawqMasterPortConflict(configurations):
  215. prop_name = 'hawq_master_address_port'
  216. validationItems.append({"config-name": prop_name,
  217. "item": self.getWarnItem(
  218. "The default Postgres port (5432) on the Ambari Server conflicts with the default HAWQ Masters port. "
  219. "If you are using port 5432 for Postgres, you must either deploy the HAWQ Masters on a different host "
  220. "or configure a different port for the HAWQ Masters in the HAWQ Configuration page.")})
  221. # 2. Check if any data directories are pointing to root dir '/'
  222. directories = {
  223. 'hawq_master_directory': 'HAWQ Master directory',
  224. 'hawq_master_temp_directory': 'HAWQ Master temp directory',
  225. 'hawq_segment_directory': 'HAWQ Segment directory',
  226. 'hawq_segment_temp_directory': 'HAWQ Segment temp directory'
  227. }
  228. for property_name, display_name in directories.iteritems():
  229. self.validateIfRootDir(properties, validationItems, property_name, display_name)
  230. # 2.1 Check if any master or segment directories has multiple values
  231. directories = {
  232. 'hawq_master_directory': 'HAWQ Master directory',
  233. 'hawq_segment_directory': 'HAWQ Segment directory'
  234. }
  235. for property_name, display_name in directories.iteritems():
  236. self.checkForMultipleDirs(properties, validationItems, property_name, display_name)
  237. # 3. Check YARN RM address properties
  238. YARN = "YARN"
  239. servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
  240. if YARN in servicesList and "yarn-site" in configurations:
  241. yarn_site = self.getSiteProperties(configurations, "yarn-site")
  242. for hs_prop, ys_prop in self.getHAWQYARNPropertyMapping().items():
  243. if hs_prop in hawq_site and ys_prop in yarn_site and hawq_site[hs_prop] != yarn_site[ys_prop]:
  244. message = "Expected value: {0} (this property should have the same value as the property {1} in yarn-site)".format(yarn_site[ys_prop], ys_prop)
  245. validationItems.append({"config-name": hs_prop, "item": self.getWarnItem(message)})
  246. # 4. Check HAWQ Resource Manager type
  247. HAWQ_GLOBAL_RM_TYPE = "hawq_global_rm_type"
  248. if YARN not in servicesList and HAWQ_GLOBAL_RM_TYPE in hawq_site and hawq_site[HAWQ_GLOBAL_RM_TYPE].upper() == YARN:
  249. message = "{0} must be set to none if YARN service is not installed".format(HAWQ_GLOBAL_RM_TYPE)
  250. validationItems.append({"config-name": HAWQ_GLOBAL_RM_TYPE, "item": self.getErrorItem(message)})
  251. # 5. Check query limits
  252. if ("default_hash_table_bucket_number" in hawq_site and
  253. "hawq_rm_nvseg_perquery_limit" in hawq_site and
  254. int(hawq_site["default_hash_table_bucket_number"]) > int(hawq_site["hawq_rm_nvseg_perquery_limit"])):
  255. message = "Default buckets for Hash Distributed tables parameter value should not be greater than the value of Virtual Segments Limit per Query (Total) parameter, currently set to {0}.".format(hawq_site["hawq_rm_nvseg_perquery_limit"])
  256. validationItems.append({"config-name": "default_hash_table_bucket_number", "item": self.getErrorItem(message)})
  257. return stackAdvisor.toConfigurationValidationProblems(validationItems, "hawq-site")
  258. def validateHAWQHdfsClientConfigurations(self, stackAdvisor, properties, recommendedDefaults, configurations, services, hosts):
  259. hdfs_client = properties
  260. validationItems = []
  261. # check HAWQ hdfs-client output.replace-datanode-on-failure property
  262. PROP_NAME = "output.replace-datanode-on-failure"
  263. if PROP_NAME in hdfs_client:
  264. value = hdfs_client[PROP_NAME].upper()
  265. componentsListList = [service["components"] for service in services["services"]]
  266. componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
  267. numSegments = len(self.getHosts(componentsList, "HAWQSEGMENT"))
  268. message = None
  269. MIN_NUM_SEGMENT_THRESHOLD = 3
  270. if numSegments > MIN_NUM_SEGMENT_THRESHOLD and value != 'TRUE':
  271. message = "{0} should be set to true (checked) for clusters with more than {1} HAWQ Segments"
  272. elif numSegments <= MIN_NUM_SEGMENT_THRESHOLD and value != 'FALSE':
  273. message = "{0} should be set to false (unchecked) for clusters with {1} or less HAWQ Segments"
  274. if message:
  275. validationItems.append({"config-name": PROP_NAME, "item": self.getWarnItem(message.format(PROP_NAME, str(MIN_NUM_SEGMENT_THRESHOLD)))})
  276. return stackAdvisor.toConfigurationValidationProblems(validationItems, "hdfs-client")