utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. """
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import os
  17. import re
  18. import urllib2
  19. import json
  20. from resource_management import *
  21. from resource_management.libraries.functions.format import format
  22. from resource_management.core.shell import call, checked_call
  23. from resource_management.core.exceptions import ComponentIsNotRunning
  24. from zkfc_slave import ZkfcSlave
  25. def safe_zkfc_op(action, env):
  26. """
  27. Idempotent operation on the zkfc process to either start or stop it.
  28. :param action: start or stop
  29. :param env: environment
  30. """
  31. zkfc = None
  32. if action == "start":
  33. try:
  34. zkfc = ZkfcSlave()
  35. zkfc.status(env)
  36. except ComponentIsNotRunning:
  37. if zkfc:
  38. zkfc.start(env)
  39. if action == "stop":
  40. try:
  41. zkfc = ZkfcSlave()
  42. zkfc.status(env)
  43. except ComponentIsNotRunning:
  44. pass
  45. else:
  46. if zkfc:
  47. zkfc.stop(env)
  48. def failover_namenode():
  49. """
  50. Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary).
  51. """
  52. import params
  53. check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}")
  54. code, out = call(check_service_cmd, logoutput=True, user=params.hdfs_user)
  55. state = "unknown"
  56. if code == 0 and out:
  57. state = "active" if "active" in out else ("standby" if "standby" in out else state)
  58. Logger.info("Namenode service state: %s" % state)
  59. if state == "active":
  60. Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode")
  61. # Forcefully kill ZKFC on this host to initiate a failover
  62. # If ZKFC is already dead, then potentially this node can still be the active one.
  63. was_zkfc_killed = kill_zkfc(params.hdfs_user)
  64. # Wait until it transitions to standby
  65. check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
  66. # process may already be down. try one time, then proceed
  67. code, out = call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
  68. Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
  69. if code == 255 and out:
  70. Logger.info("Rolling Upgrade - namenode is already down")
  71. else:
  72. if was_zkfc_killed:
  73. # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
  74. Execute(check_standby_cmd,
  75. user=params.hdfs_user,
  76. tries=50,
  77. try_sleep=6,
  78. logoutput=True)
  79. else:
  80. Logger.info("Rolling Upgrade - Host %s is the standby namenode." % str(params.hostname))
  81. def kill_zkfc(zkfc_user):
  82. """
  83. There are two potential methods for failing over the namenode, especially during a Rolling Upgrade.
  84. Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it.
  85. Option 2. Silent failover (not supported as of HDP 2.2.0.0)
  86. :param zkfc_user: User that started the ZKFC process.
  87. :return: Return True if ZKFC was killed, otherwise, false.
  88. """
  89. import params
  90. if params.dfs_ha_enabled:
  91. zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user)
  92. if zkfc_pid_file:
  93. check_process = format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1")
  94. code, out = call(check_process)
  95. if code == 0:
  96. Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
  97. kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1")
  98. Execute(kill_command)
  99. Execute(format("rm -f {zkfc_pid_file}"))
  100. return True
  101. return False
  102. def get_service_pid_file(name, user):
  103. """
  104. Get the pid file path that was used to start the service by the user.
  105. :param name: Service name
  106. :param user: User that started the service.
  107. :return: PID file path
  108. """
  109. import params
  110. pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
  111. pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
  112. return pid_file
  113. def service(action=None, name=None, user=None, options="", create_pid_dir=False,
  114. create_log_dir=False):
  115. """
  116. :param action: Either "start" or "stop"
  117. :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc"
  118. :param user: User to run the command as
  119. :param options: Additional options to pass to command as a string
  120. :param create_pid_dir: Create PID directory
  121. :param create_log_dir: Crate log file directory
  122. """
  123. import params
  124. options = options if options else ""
  125. pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
  126. pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
  127. hadoop_env_exports = {
  128. 'HADOOP_LIBEXEC_DIR': params.hadoop_libexec_dir
  129. }
  130. log_dir = format("{hdfs_log_dir_prefix}/{user}")
  131. # NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
  132. # on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
  133. if name == "nfs3" :
  134. pid_file = format("{pid_dir}/hadoop_privileged_nfs3.pid")
  135. custom_export = {
  136. 'HADOOP_PRIVILEGED_NFS_USER': params.hdfs_user,
  137. 'HADOOP_PRIVILEGED_NFS_PID_DIR': pid_dir,
  138. 'HADOOP_PRIVILEGED_NFS_LOG_DIR': log_dir
  139. }
  140. hadoop_env_exports.update(custom_export)
  141. check_process = format(
  142. "ls {pid_file} >/dev/null 2>&1 &&"
  143. " ps -p `cat {pid_file}` >/dev/null 2>&1")
  144. if create_pid_dir:
  145. Directory(pid_dir,
  146. owner=user,
  147. recursive=True)
  148. if create_log_dir:
  149. if name == "nfs3":
  150. Directory(log_dir,
  151. mode=0775,
  152. owner=params.root_user,
  153. group=params.user_group)
  154. else:
  155. Directory(log_dir,
  156. owner=user,
  157. recursive=True)
  158. if params.security_enabled and name == "datanode":
  159. ## The directory where pid files are stored in the secure data environment.
  160. hadoop_secure_dn_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
  161. hadoop_secure_dn_pid_file = format("{hadoop_secure_dn_pid_dir}/hadoop_secure_dn.pid")
  162. # At Champlain stack and further, we may start datanode as a non-root even in secure cluster
  163. if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) or params.secure_dn_ports_are_in_use:
  164. user = "root"
  165. pid_file = format(
  166. "{hadoop_pid_dir_prefix}/{hdfs_user}/hadoop-{hdfs_user}-{name}.pid")
  167. if action == 'stop' and (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) and \
  168. os.path.isfile(hadoop_secure_dn_pid_file):
  169. # We need special handling for this case to handle the situation
  170. # when we configure non-root secure DN and then restart it
  171. # to handle new configs. Otherwise we will not be able to stop
  172. # a running instance
  173. user = "root"
  174. try:
  175. check_process_status(hadoop_secure_dn_pid_file)
  176. custom_export = {
  177. 'HADOOP_SECURE_DN_USER': params.hdfs_user
  178. }
  179. hadoop_env_exports.update(custom_export)
  180. except ComponentIsNotRunning:
  181. pass
  182. hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh")
  183. if user == "root":
  184. cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name]
  185. if options:
  186. cmd += [options, ]
  187. daemon_cmd = as_sudo(cmd)
  188. else:
  189. cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}")
  190. if options:
  191. cmd += " " + options
  192. daemon_cmd = as_user(cmd, user)
  193. service_is_up = check_process if action == "start" else None
  194. #remove pid file from dead process
  195. File(pid_file,
  196. action="delete",
  197. not_if=check_process
  198. )
  199. Execute(daemon_cmd,
  200. not_if=service_is_up,
  201. environment=hadoop_env_exports
  202. )
  203. if action == "stop":
  204. File(pid_file,
  205. action="delete",
  206. )
  207. def get_value_from_jmx(qry, property):
  208. try:
  209. response = urllib2.urlopen(qry)
  210. data = response.read()
  211. if data:
  212. data_dict = json.loads(data)
  213. return data_dict["beans"][0][property]
  214. except:
  215. return None
  216. def get_jmx_data(nn_address, modeler_type, metric, encrypted=False):
  217. """
  218. :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.
  219. If not preceded, will use the encrypted param to determine.
  220. :param modeler_type: Modeler type to query using startswith function
  221. :param metric: Metric to return
  222. :return: Return an object representation of the metric, or None if it does not exist
  223. """
  224. if not nn_address or not modeler_type or not metric:
  225. return None
  226. nn_address = nn_address.strip()
  227. if not nn_address.startswith("http"):
  228. nn_address = ("https://" if encrypted else "http://") + nn_address
  229. if not nn_address.endswith("/"):
  230. nn_address = nn_address + "/"
  231. nn_address = nn_address + "jmx"
  232. Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address))
  233. data = urllib2.urlopen(nn_address).read()
  234. data_dict = json.loads(data)
  235. my_data = None
  236. if data_dict:
  237. for el in data_dict['beans']:
  238. if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type):
  239. if metric in el:
  240. my_data = el[metric]
  241. if my_data:
  242. my_data = json.loads(str(my_data))
  243. break
  244. return my_data
  245. def get_port(address):
  246. """
  247. Extracts port from the address like 0.0.0.0:1019
  248. """
  249. if address is None:
  250. return None
  251. m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
  252. if m is not None and len(m.groups()) >= 2:
  253. return int(m.group(2))
  254. else:
  255. return None
  256. def is_secure_port(port):
  257. """
  258. Returns True if port is root-owned at *nix systems
  259. """
  260. if port is not None:
  261. return port < 1024
  262. else:
  263. return False