utils.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. """
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import os
  17. import re
  18. import urllib2
  19. import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
  20. from resource_management.core.resources.system import Directory, File, Execute
  21. from resource_management.libraries.functions.format import format
  22. from resource_management.libraries.functions import check_process_status
  23. from resource_management.libraries.functions.version import compare_versions
  24. from resource_management.core import shell
  25. from resource_management.core.shell import as_user, as_sudo
  26. from resource_management.core.exceptions import ComponentIsNotRunning
  27. from resource_management.core.logger import Logger
  28. from resource_management.libraries.functions.curl_krb_request import curl_krb_request
  29. from resource_management.core.exceptions import Fail
  30. from resource_management.libraries.functions.namenode_ha_utils import get_namenode_states
  31. from resource_management.libraries.script.script import Script
  32. from zkfc_slave import ZkfcSlave
  33. def safe_zkfc_op(action, env):
  34. """
  35. Idempotent operation on the zkfc process to either start or stop it.
  36. :param action: start or stop
  37. :param env: environment
  38. """
  39. Logger.info("Performing action {0} on zkfc.".format(action))
  40. zkfc = None
  41. if action == "start":
  42. try:
  43. zkfc = ZkfcSlave()
  44. zkfc.status(env)
  45. except ComponentIsNotRunning:
  46. if zkfc:
  47. zkfc.start(env)
  48. if action == "stop":
  49. try:
  50. zkfc = ZkfcSlave()
  51. zkfc.status(env)
  52. except ComponentIsNotRunning:
  53. pass
  54. else:
  55. if zkfc:
  56. zkfc.stop(env)
  57. def initiate_safe_zkfc_failover():
  58. """
  59. If this is the active namenode, initiate a safe failover and wait for it to become the standby.
  60. If an error occurs, force a failover to happen by killing zkfc on this host. In this case, during the Restart,
  61. will also have to start ZKFC manually.
  62. """
  63. import params
  64. # Must kinit before running the HDFS command
  65. if params.security_enabled:
  66. Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
  67. user = params.hdfs_user)
  68. active_namenode_id = None
  69. standby_namenode_id = None
  70. active_namenodes, standby_namenodes, unknown_namenodes = get_namenode_states(params.hdfs_site, params.security_enabled, params.hdfs_user)
  71. if active_namenodes:
  72. active_namenode_id = active_namenodes[0][0]
  73. if standby_namenodes:
  74. standby_namenode_id = standby_namenodes[0][0]
  75. if active_namenode_id:
  76. Logger.info(format("Active NameNode id: {active_namenode_id}"))
  77. if standby_namenode_id:
  78. Logger.info(format("Standby NameNode id: {standby_namenode_id}"))
  79. if unknown_namenodes:
  80. for unknown_namenode in unknown_namenodes:
  81. Logger.info("NameNode HA state for {0} is unknown".format(unknown_namenode[0]))
  82. if params.namenode_id == active_namenode_id and params.other_namenode_id == standby_namenode_id:
  83. # Failover if this NameNode is active and other NameNode is up and in standby (i.e. ready to become active on failover)
  84. Logger.info(format("NameNode {namenode_id} is active and NameNode {other_namenode_id} is in standby"))
  85. failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}")
  86. check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby")
  87. msg = "Rolling Upgrade - Initiating a ZKFC failover on active NameNode host {0}.".format(params.hostname)
  88. Logger.info(msg)
  89. code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True)
  90. Logger.info(format("Rolling Upgrade - failover command returned {code}"))
  91. wait_for_standby = False
  92. if code == 0:
  93. wait_for_standby = True
  94. else:
  95. # Try to kill ZKFC manually
  96. was_zkfc_killed = kill_zkfc(params.hdfs_user)
  97. code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True)
  98. Logger.info(format("Rolling Upgrade - check for standby returned {code}"))
  99. if code == 255 and out:
  100. Logger.info("Rolling Upgrade - NameNode is already down.")
  101. else:
  102. if was_zkfc_killed:
  103. # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover.
  104. wait_for_standby = True
  105. if wait_for_standby:
  106. Logger.info("Waiting for this NameNode to become the standby one.")
  107. Execute(check_standby_cmd,
  108. user=params.hdfs_user,
  109. tries=50,
  110. try_sleep=6,
  111. logoutput=True)
  112. else:
  113. msg = "Rolling Upgrade - Skipping ZKFC failover on NameNode host {0}.".format(params.hostname)
  114. Logger.info(msg)
  115. def kill_zkfc(zkfc_user):
  116. """
  117. There are two potential methods for failing over the namenode, especially during a Rolling Upgrade.
  118. Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it.
  119. Option 2. Silent failover (not supported as of HDP 2.2.0.0)
  120. :param zkfc_user: User that started the ZKFC process.
  121. :return: Return True if ZKFC was killed, otherwise, false.
  122. """
  123. import params
  124. if params.dfs_ha_enabled:
  125. if params.zkfc_pid_file:
  126. check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user)
  127. code, out = shell.call(check_process)
  128. if code == 0:
  129. Logger.debug("ZKFC is running and will be killed.")
  130. kill_command = format("kill -15 `cat {zkfc_pid_file}`")
  131. Execute(kill_command,
  132. user=zkfc_user
  133. )
  134. File(params.zkfc_pid_file,
  135. action = "delete",
  136. )
  137. return True
  138. return False
  139. def service(action=None, name=None, user=None, options="", create_pid_dir=False,
  140. create_log_dir=False):
  141. """
  142. :param action: Either "start" or "stop"
  143. :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc"
  144. :param user: User to run the command as
  145. :param options: Additional options to pass to command as a string
  146. :param create_pid_dir: Create PID directory
  147. :param create_log_dir: Crate log file directory
  148. """
  149. import params
  150. options = options if options else ""
  151. pid_dir = format("{hadoop_pid_dir_prefix}/{user}")
  152. pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid")
  153. hadoop_env_exports = {
  154. 'HADOOP_LIBEXEC_DIR': params.hadoop_libexec_dir
  155. }
  156. log_dir = format("{hdfs_log_dir_prefix}/{user}")
  157. # NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
  158. # on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
  159. if name == "nfs3" :
  160. pid_file = format("{pid_dir}/hadoop_privileged_nfs3.pid")
  161. custom_export = {
  162. 'HADOOP_PRIVILEGED_NFS_USER': params.hdfs_user,
  163. 'HADOOP_PRIVILEGED_NFS_PID_DIR': pid_dir,
  164. 'HADOOP_PRIVILEGED_NFS_LOG_DIR': log_dir
  165. }
  166. hadoop_env_exports.update(custom_export)
  167. process_id_exists_command = as_sudo(["test", "-f", pid_file]) + " && " + as_sudo(["pgrep", "-F", pid_file])
  168. # on STOP directories shouldn't be created
  169. # since during stop still old dirs are used (which were created during previous start)
  170. if action != "stop":
  171. if name == "nfs3":
  172. Directory(params.hadoop_pid_dir_prefix,
  173. mode=0755,
  174. owner=params.root_user,
  175. group=params.root_group
  176. )
  177. else:
  178. Directory(params.hadoop_pid_dir_prefix,
  179. mode=0755,
  180. owner=params.hdfs_user,
  181. group=params.user_group
  182. )
  183. if create_pid_dir:
  184. Directory(pid_dir,
  185. owner=user,
  186. recursive=True)
  187. if create_log_dir:
  188. if name == "nfs3":
  189. Directory(log_dir,
  190. mode=0775,
  191. owner=params.root_user,
  192. group=params.user_group)
  193. else:
  194. Directory(log_dir,
  195. owner=user,
  196. recursive=True)
  197. if params.security_enabled and name == "datanode":
  198. ## The directory where pid files are stored in the secure data environment.
  199. hadoop_secure_dn_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}")
  200. hadoop_secure_dn_pid_file = format("{hadoop_secure_dn_pid_dir}/hadoop_secure_dn.pid")
  201. # At Champlain stack and further, we may start datanode as a non-root even in secure cluster
  202. if not (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) or params.secure_dn_ports_are_in_use:
  203. user = "root"
  204. pid_file = format(
  205. "{hadoop_pid_dir_prefix}/{hdfs_user}/hadoop-{hdfs_user}-{name}.pid")
  206. if action == 'stop' and (params.hdp_stack_version != "" and compare_versions(params.hdp_stack_version, '2.2') >= 0) and \
  207. os.path.isfile(hadoop_secure_dn_pid_file):
  208. # We need special handling for this case to handle the situation
  209. # when we configure non-root secure DN and then restart it
  210. # to handle new configs. Otherwise we will not be able to stop
  211. # a running instance
  212. user = "root"
  213. try:
  214. check_process_status(hadoop_secure_dn_pid_file)
  215. custom_export = {
  216. 'HADOOP_SECURE_DN_USER': params.hdfs_user
  217. }
  218. hadoop_env_exports.update(custom_export)
  219. except ComponentIsNotRunning:
  220. pass
  221. hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh")
  222. if user == "root":
  223. cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name]
  224. if options:
  225. cmd += [options, ]
  226. daemon_cmd = as_sudo(cmd)
  227. else:
  228. cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}")
  229. if options:
  230. cmd += " " + options
  231. daemon_cmd = as_user(cmd, user)
  232. if action == "start":
  233. # remove pid file from dead process
  234. File(pid_file, action="delete", not_if=process_id_exists_command)
  235. Execute(daemon_cmd, not_if=process_id_exists_command, environment=hadoop_env_exports)
  236. elif action == "stop":
  237. Execute(daemon_cmd, only_if=process_id_exists_command, environment=hadoop_env_exports)
  238. File(pid_file, action="delete")
  239. def get_jmx_data(nn_address, modeler_type, metric, encrypted=False, security_enabled=False):
  240. """
  241. :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.
  242. If not preceded, will use the encrypted param to determine.
  243. :param modeler_type: Modeler type to query using startswith function
  244. :param metric: Metric to return
  245. :return: Return an object representation of the metric, or None if it does not exist
  246. """
  247. if not nn_address or not modeler_type or not metric:
  248. return None
  249. nn_address = nn_address.strip()
  250. if not nn_address.startswith("http"):
  251. nn_address = ("https://" if encrypted else "http://") + nn_address
  252. if not nn_address.endswith("/"):
  253. nn_address = nn_address + "/"
  254. nn_address = nn_address + "jmx"
  255. Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address))
  256. if security_enabled:
  257. import params
  258. data, error_msg, time_millis = curl_krb_request(params.tmp_dir, params.smoke_user_keytab, params.smokeuser_principal, nn_address,
  259. "jn_upgrade", params.kinit_path_local, False, None, params.smoke_user)
  260. else:
  261. data = urllib2.urlopen(nn_address).read()
  262. my_data = None
  263. if data:
  264. data_dict = json.loads(data)
  265. if data_dict:
  266. for el in data_dict['beans']:
  267. if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type):
  268. if metric in el:
  269. my_data = el[metric]
  270. if my_data:
  271. my_data = json.loads(str(my_data))
  272. break
  273. return my_data
  274. def get_port(address):
  275. """
  276. Extracts port from the address like 0.0.0.0:1019
  277. """
  278. if address is None:
  279. return None
  280. m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
  281. if m is not None and len(m.groups()) >= 2:
  282. return int(m.group(2))
  283. else:
  284. return None
  285. def is_secure_port(port):
  286. """
  287. Returns True if port is root-owned at *nix systems
  288. """
  289. if port is not None:
  290. return port < 1024
  291. else:
  292. return False
  293. def is_previous_fs_image():
  294. """
  295. Return true if there's a previous folder in the HDFS namenode directories.
  296. """
  297. import params
  298. if params.dfs_name_dir:
  299. nn_name_dirs = params.dfs_name_dir.split(',')
  300. for nn_dir in nn_name_dirs:
  301. prev_dir = os.path.join(nn_dir, "previous")
  302. if os.path.isdir(prev_dir):
  303. return True
  304. return False
  305. def get_hdfs_binary(distro_component_name):
  306. """
  307. Get the hdfs binary to use depending on the stack and version.
  308. :param distro_component_name: e.g., hadoop-hdfs-namenode, hadoop-hdfs-datanode
  309. :return: The hdfs binary to use
  310. """
  311. import params
  312. hdfs_binary = "hdfs"
  313. if params.stack_name == "HDP":
  314. # This was used in HDP 2.1 and earlier
  315. hdfs_binary = "hdfs"
  316. if Script.is_hdp_stack_greater_or_equal("2.2"):
  317. hdfs_binary = "/usr/hdp/current/{0}/bin/hdfs".format(distro_component_name)
  318. return hdfs_binary
  319. def get_dfsadmin_base_command(hdfs_binary, use_specific_namenode = False):
  320. """
  321. Get the dfsadmin base command constructed using hdfs_binary path and passing namenode address as explicit -fs argument
  322. :param hdfs_binary: path to hdfs binary to use
  323. :param use_specific_namenode: flag if set and Namenode HA is enabled, then the dfsadmin command will use
  324. current namenode's address
  325. :return: the constructed dfsadmin base command
  326. """
  327. import params
  328. dfsadmin_base_command = ""
  329. if params.dfs_ha_enabled and use_specific_namenode:
  330. dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs hdfs://{params.namenode_rpc}")
  331. else:
  332. dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs {params.namenode_address}")
  333. return dfsadmin_base_command