utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. """
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import os
  17. import re
  18. import urllib2
  19. import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
  20. from resource_management.core.resources.system import Directory, File, Execute
  21. from resource_management.libraries.functions.format import format
  22. from resource_management.libraries.functions import check_process_status
  23. from resource_management.libraries.functions.version import compare_versions
  24. from resource_management.core import shell
  25. from resource_management.core.shell import as_user, as_sudo
  26. from resource_management.core.exceptions import ComponentIsNotRunning
  27. from resource_management.core.logger import Logger
  28. from resource_management.libraries.functions.curl_krb_request import curl_krb_request
  29. from zkfc_slave import ZkfcSlave
  30. def safe_zkfc_op(action, env):
  31. """
  32. Idempotent operation on the zkfc process to either start or stop it.
  33. :param action: start or stop
  34. :param env: environment
  35. """
  36. Logger.info("Performing action {0} on zkfc.".format(action))
  37. zkfc = None
  38. if action == "start":
  39. try:
  40. zkfc = ZkfcSlave()
  41. zkfc.status(env)
  42. except ComponentIsNotRunning:
  43. if zkfc:
  44. zkfc.start(env)
  45. if action == "stop":
  46. try:
  47. zkfc = ZkfcSlave()
  48. zkfc.status(env)
  49. except ComponentIsNotRunning:
  50. pass
  51. else:
  52. if zkfc:
  53. zkfc.stop(env)
  54. def failover_namenode():
  55. """
  56. Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary).
  57. """
  58. import params
  59. check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
  60. code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user)
  61. state = "unknown"
  62. if code == 0 and out:
  63. state = "active" if "active" in out else ("standby" if "standby" in out else state)
  64. Logger.info("Namenode service state: %s" % state)
  65. if state == "active":
  66. Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode")
  67. # Forcefully kill ZKFC on this host to initiate a failover
  68. # If ZKFC is already dead, then potentially this node can still be the active one.
  69. was_zkfc_killed = kill_zkfc(params.hdfs_user)
  70. # Wait until it transitions to standby
  71. check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
  72. # process may already be down. try one time, then proceed
  73. code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
  74. Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
  75. if code == 255 and out:
  76. Logger.info("Rolling Upgrade - namenode is already down.")
  77. else:
  78. if was_zkfc_killed:
  79. # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
  80. Logger.info("Waiting for this NameNode to become the standby one.")
  81. Execute(check_standby_cmd,
  82. user=params.hdfs_user,
  83. tries=50,
  84. try_sleep=6,
  85. logoutput=True)
  86. else:
  87. Logger.info("Rolling Upgrade - Host %s is already the standby namenode." % str(params.hostname))
  88. def kill_zkfc(zkfc_user):
  89. """
  90. There are two potential methods for failing over the namenode, especially during a Rolling Upgrade.
  91. Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it.
  92. Option 2. Silent failover (not supported as of HDP 2.2.0.0)
  93. :param zkfc_user: User that started the ZKFC process.
  94. :return: Return True if ZKFC was killed, otherwise, false.
  95. """
  96. import params
  97. if params.dfs_ha_enabled:
  98. zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user)
  99. if zkfc_pid_file:
  100. check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
  101. code, out = shell.call(check_process)
  102. if code == 0:
  103. Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
  104. kill_command = format("kill -9 `cat {zkfc_pid_file}`")
  105. Execute(kill_command,
  106. user=zkfc_user
  107. )
  108. File(zkfc_pid_file,
  109. action = "delete",
  110. )
  111. return True
  112. return False
  113. def get_service_pid_file(name, user):
  114. """
  115. Get the pid file path that was used to start the service by the user.
  116. :param name: Service name
  117. :param user: User that started the service.
  118. :return: PID file path
  119. """
  120. import params
  121. pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
  122. pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
  123. return pid_file
  124. def service(action=None, name=None, user=None, options="", create_pid_dir=False,
  125. create_log_dir=False):
  126. """
  127. :param action: Either "start" or "stop"
  128. :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc"
  129. :param user: User to run the command as
  130. :param options: Additional options to pass to command as a string
  131. :param create_pid_dir: Create PID directory
  132. :param create_log_dir: Crate log file directory
  133. """
  134. import params
  135. options = options if options else ""
  136. pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
  137. pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
  138. hadoop_env_exports = {
  139. 'HADOOP_LIBEXEC_DIR': params.hadoop_libexec_dir
  140. }
  141. log_dir = format("{hdfs_log_dir_prefix}/{user}")
  142. # NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
  143. # on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
  144. if name == "nfs3" :
  145. pid_file = format("{pid_dir}/hadoop_privileged_nfs3.pid")
  146. custom_export = {
  147. 'HADOOP_PRIVILEGED_NFS_USER': params.hdfs_user,
  148. 'HADOOP_PRIVILEGED_NFS_PID_DIR': pid_dir,
  149. 'HADOOP_PRIVILEGED_NFS_LOG_DIR': log_dir
  150. }
  151. hadoop_env_exports.update(custom_export)
  152. check_process = as_user(format(
  153. "ls {pid_file} >/dev/null 2>&1 &&"
  154. " ps -p `cat {pid_file}` >/dev/null 2>&1"), user=params.hdfs_user)
  155. # on STOP directories shouldn't be created
  156. # since during stop still old dirs are used (which were created during previous start)
  157. if action != "stop":
  158. if name == "nfs3":
  159. Directory(params.hadoop_pid_dir_prefix,
  160. mode=0755,
  161. owner=params.root_user,
  162. group=params.root_group
  163. )
  164. else:
  165. Directory(params.hadoop_pid_dir_prefix,
  166. mode=0755,
  167. owner=params.hdfs_user,
  168. group=params.user_group
  169. )
  170. if create_pid_dir:
  171. Directory(pid_dir,
  172. owner=user,
  173. recursive=True)
  174. if create_log_dir:
  175. if name == "nfs3":
  176. Directory(log_dir,
  177. mode=0775,
  178. owner=params.root_user,
  179. group=params.user_group)
  180. else:
  181. Directory(log_dir,
  182. owner=user,
  183. recursive=True)
  184. if params.security_enabled and name == "datanode":
  185. ## The directory where pid files are stored in the secure data environment.
  186. hadoop_secure_dn_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
  187. hadoop_secure_dn_pid_file = format("{hadoop_secure_dn_pid_dir}/hadoop_secure_dn.pid")
  188. # At Champlain stack and further, we may start datanode as a non-root even in secure cluster
  189. if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) or params.secure_dn_ports_are_in_use:
  190. user = "root"
  191. pid_file = format(
  192. "{hadoop_pid_dir_prefix}/{hdfs_user}/hadoop-{hdfs_user}-{name}.pid")
  193. if action == 'stop' and (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) and \
  194. os.path.isfile(hadoop_secure_dn_pid_file):
  195. # We need special handling for this case to handle the situation
  196. # when we configure non-root secure DN and then restart it
  197. # to handle new configs. Otherwise we will not be able to stop
  198. # a running instance
  199. user = "root"
  200. try:
  201. check_process_status(hadoop_secure_dn_pid_file)
  202. custom_export = {
  203. 'HADOOP_SECURE_DN_USER': params.hdfs_user
  204. }
  205. hadoop_env_exports.update(custom_export)
  206. except ComponentIsNotRunning:
  207. pass
  208. hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh")
  209. if user == "root":
  210. cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name]
  211. if options:
  212. cmd += [options, ]
  213. daemon_cmd = as_sudo(cmd)
  214. else:
  215. cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}")
  216. if options:
  217. cmd += " " + options
  218. daemon_cmd = as_user(cmd, user)
  219. service_is_up = check_process if action == "start" else None
  220. #remove pid file from dead process
  221. File(pid_file,
  222. action="delete",
  223. not_if=check_process
  224. )
  225. Execute(daemon_cmd,
  226. not_if=service_is_up,
  227. environment=hadoop_env_exports
  228. )
  229. if action == "stop":
  230. File(pid_file,
  231. action="delete",
  232. )
  233. def get_jmx_data(nn_address, modeler_type, metric, encrypted=False, security_enabled=False):
  234. """
  235. :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.
  236. If not preceded, will use the encrypted param to determine.
  237. :param modeler_type: Modeler type to query using startswith function
  238. :param metric: Metric to return
  239. :return: Return an object representation of the metric, or None if it does not exist
  240. """
  241. if not nn_address or not modeler_type or not metric:
  242. return None
  243. nn_address = nn_address.strip()
  244. if not nn_address.startswith("http"):
  245. nn_address = ("https://" if encrypted else "http://") + nn_address
  246. if not nn_address.endswith("/"):
  247. nn_address = nn_address + "/"
  248. nn_address = nn_address + "jmx"
  249. Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address))
  250. if security_enabled:
  251. import params
  252. data, error_msg, time_millis = curl_krb_request(params.tmp_dir, params.smoke_user_keytab, params.smokeuser_principal, nn_address,
  253. "jn_upgrade", params.kinit_path_local, False, None, params.smoke_user)
  254. else:
  255. data = urllib2.urlopen(nn_address).read()
  256. my_data = None
  257. if data:
  258. data_dict = json.loads(data)
  259. if data_dict:
  260. for el in data_dict['beans']:
  261. if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type):
  262. if metric in el:
  263. my_data = el[metric]
  264. if my_data:
  265. my_data = json.loads(str(my_data))
  266. break
  267. return my_data
  268. def get_port(address):
  269. """
  270. Extracts port from the address like 0.0.0.0:1019
  271. """
  272. if address is None:
  273. return None
  274. m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
  275. if m is not None and len(m.groups()) >= 2:
  276. return int(m.group(2))
  277. else:
  278. return None
  279. def is_secure_port(port):
  280. """
  281. Returns True if port is root-owned at *nix systems
  282. """
  283. if port is not None:
  284. return port < 1024
  285. else:
  286. return False