kafka.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. #!/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. """
  17. import collections
  18. import os
  19. from resource_management.libraries.functions.version import format_stack_version
  20. from resource_management.libraries.resources.properties_file import PropertiesFile
  21. from resource_management.libraries.resources.template_config import TemplateConfig
  22. from resource_management.core.resources.system import Directory, Execute, File, Link
  23. from resource_management.core.source import StaticFile, Template, InlineTemplate
  24. from resource_management.libraries.functions import format
  25. from resource_management.libraries.functions.stack_features import check_stack_feature
  26. from resource_management.libraries.functions import StackFeature
  27. from resource_management.core.logger import Logger
  28. def kafka(upgrade_type=None):
  29. import params
  30. ensure_base_directories()
  31. kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
  32. # This still has an issue of hostnames being alphabetically out-of-order for broker.id in HDP-2.2.
  33. # Starting in HDP 2.3, Kafka handles the generation of broker.id so Ambari doesn't have to.
  34. effective_version = params.stack_version_formatted if upgrade_type is None else format_stack_version(params.version)
  35. Logger.info(format("Effective stack version: {effective_version}"))
  36. if effective_version is not None and effective_version != "" and \
  37. check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version):
  38. if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts:
  39. brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
  40. kafka_server_config['broker.id'] = brokerid
  41. Logger.info(format("Calculating broker.id as {brokerid}"))
  42. # listeners and advertised.listeners are only added in 2.3.0.0 onwards.
  43. if effective_version is not None and effective_version != "" and \
  44. check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version):
  45. listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
  46. Logger.info(format("Kafka listeners: {listeners}"))
  47. if params.security_enabled and params.kafka_kerberos_enabled:
  48. Logger.info("Kafka kerberos security is enabled.")
  49. if "SASL" not in listeners:
  50. listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
  51. kafka_server_config['listeners'] = listeners
  52. kafka_server_config['advertised.listeners'] = listeners
  53. Logger.info(format("Kafka advertised listeners: {listeners}"))
  54. else:
  55. kafka_server_config['listeners'] = listeners
  56. if 'advertised.listeners' in kafka_server_config:
  57. advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
  58. kafka_server_config['advertised.listeners'] = advertised_listeners
  59. Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
  60. else:
  61. kafka_server_config['host.name'] = params.hostname
  62. if params.has_metric_collector:
  63. kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
  64. kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
  65. kafka_server_config['kafka.timeline.metrics.protocol'] = params.metric_collector_protocol
  66. kafka_server_config['kafka.timeline.metrics.truststore.path'] = params.metric_truststore_path
  67. kafka_server_config['kafka.timeline.metrics.truststore.type'] = params.metric_truststore_type
  68. kafka_server_config['kafka.timeline.metrics.truststore.password'] = params.metric_truststore_password
  69. kafka_data_dir = kafka_server_config['log.dirs']
  70. kafka_data_dirs = filter(None, kafka_data_dir.split(","))
  71. Directory(kafka_data_dirs,
  72. mode=0755,
  73. cd_access='a',
  74. owner=params.kafka_user,
  75. group=params.user_group,
  76. create_parents = True,
  77. recursive_ownership = True,
  78. )
  79. PropertiesFile("server.properties",
  80. dir=params.conf_dir,
  81. properties=kafka_server_config,
  82. owner=params.kafka_user,
  83. group=params.user_group,
  84. )
  85. File(format("{conf_dir}/kafka-env.sh"),
  86. owner=params.kafka_user,
  87. content=InlineTemplate(params.kafka_env_sh_template)
  88. )
  89. if (params.log4j_props != None):
  90. File(format("{conf_dir}/log4j.properties"),
  91. mode=0644,
  92. group=params.user_group,
  93. owner=params.kafka_user,
  94. content=params.log4j_props
  95. )
  96. if params.security_enabled and params.kafka_kerberos_enabled:
  97. TemplateConfig(format("{conf_dir}/kafka_jaas.conf"),
  98. owner=params.kafka_user)
  99. TemplateConfig(format("{conf_dir}/kafka_client_jaas.conf"),
  100. owner=params.kafka_user)
  101. # On some OS this folder could be not exists, so we will create it before pushing there files
  102. Directory(params.limits_conf_dir,
  103. create_parents = True,
  104. owner='root',
  105. group='root'
  106. )
  107. File(os.path.join(params.limits_conf_dir, 'kafka.conf'),
  108. owner='root',
  109. group='root',
  110. mode=0644,
  111. content=Template("kafka.conf.j2")
  112. )
  113. File(os.path.join(params.conf_dir, 'tools-log4j.properties'),
  114. owner='root',
  115. group='root',
  116. mode=0644,
  117. content=Template("tools-log4j.properties.j2")
  118. )
  119. setup_symlink(params.kafka_managed_pid_dir, params.kafka_pid_dir)
  120. setup_symlink(params.kafka_managed_log_dir, params.kafka_log_dir)
  121. def mutable_config_dict(kafka_broker_config):
  122. kafka_server_config = {}
  123. for key, value in kafka_broker_config.iteritems():
  124. kafka_server_config[key] = value
  125. return kafka_server_config
  126. # Used to workaround the hardcoded pid/log dir used on the kafka bash process launcher
  127. def setup_symlink(kafka_managed_dir, kafka_ambari_managed_dir):
  128. import params
  129. backup_folder_path = None
  130. backup_folder_suffix = "_tmp"
  131. if kafka_ambari_managed_dir != kafka_managed_dir:
  132. if os.path.exists(kafka_managed_dir) and not os.path.islink(kafka_managed_dir):
  133. # Backup existing data before delete if config is changed repeatedly to/from default location at any point in time time, as there may be relevant contents (historic logs)
  134. backup_folder_path = backup_dir_contents(kafka_managed_dir, backup_folder_suffix)
  135. Directory(kafka_managed_dir,
  136. action="delete",
  137. create_parents = True)
  138. elif os.path.islink(kafka_managed_dir) and os.path.realpath(kafka_managed_dir) != kafka_ambari_managed_dir:
  139. Link(kafka_managed_dir,
  140. action="delete")
  141. if not os.path.islink(kafka_managed_dir):
  142. Link(kafka_managed_dir,
  143. to=kafka_ambari_managed_dir)
  144. elif os.path.islink(kafka_managed_dir): # If config is changed and coincides with the kafka managed dir, remove the symlink and physically create the folder
  145. Link(kafka_managed_dir,
  146. action="delete")
  147. Directory(kafka_managed_dir,
  148. mode=0755,
  149. cd_access='a',
  150. owner=params.kafka_user,
  151. group=params.user_group,
  152. create_parents = True,
  153. recursive_ownership = True,
  154. )
  155. if backup_folder_path:
  156. # Restore backed up files to current relevant dirs if needed - will be triggered only when changing to/from default path;
  157. for file in os.listdir(backup_folder_path):
  158. File(os.path.join(kafka_managed_dir,file),
  159. owner=params.kafka_user,
  160. content = StaticFile(os.path.join(backup_folder_path,file)))
  161. # Clean up backed up folder
  162. Directory(backup_folder_path,
  163. action="delete",
  164. create_parents = True)
  165. # Uses agent temp dir to store backup files
  166. def backup_dir_contents(dir_path, backup_folder_suffix):
  167. import params
  168. backup_destination_path = params.tmp_dir + os.path.normpath(dir_path)+backup_folder_suffix
  169. Directory(backup_destination_path,
  170. mode=0755,
  171. cd_access='a',
  172. owner=params.kafka_user,
  173. group=params.user_group,
  174. create_parents = True,
  175. recursive_ownership = True,
  176. )
  177. # Safely copy top-level contents to backup folder
  178. for file in os.listdir(dir_path):
  179. File(os.path.join(backup_destination_path, file),
  180. owner=params.kafka_user,
  181. content = StaticFile(os.path.join(dir_path,file)))
  182. return backup_destination_path
  183. def ensure_base_directories():
  184. import params
  185. Directory([params.kafka_log_dir, params.kafka_pid_dir, params.conf_dir],
  186. mode=0755,
  187. cd_access='a',
  188. owner=params.kafka_user,
  189. group=params.user_group,
  190. create_parents = True,
  191. recursive_ownership = True,
  192. )