namenode.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. """
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import sys
  17. import os
  18. import time
  19. import json
  20. import tempfile
  21. from datetime import datetime
  22. import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
  23. from resource_management import Script
  24. from resource_management.core.resources.system import Execute, File
  25. from resource_management.core import shell
  26. from resource_management.libraries.functions import conf_select
  27. from resource_management.libraries.functions import hdp_select
  28. from resource_management.libraries.functions import Direction
  29. from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version
  30. from resource_management.libraries.functions.format import format
  31. from resource_management.libraries.functions.security_commons import build_expectations, \
  32. cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
  33. FILE_TYPE_XML
  34. from resource_management.core.exceptions import Fail
  35. from resource_management.core.shell import as_user
  36. from resource_management.core.logger import Logger
  37. from ambari_commons.os_family_impl import OsFamilyImpl
  38. from ambari_commons import OSConst
  39. import namenode_upgrade
  40. from hdfs_namenode import namenode
  41. from hdfs import hdfs
  42. import hdfs_rebalance
  43. from utils import initiate_safe_zkfc_failover, get_hdfs_binary, get_dfsadmin_base_command
  44. # hashlib is supplied as of Python 2.5 as the replacement interface for md5
  45. # and other secure hashes. In 2.6, md5 is deprecated. Import hashlib if
  46. # available, avoiding a deprecation warning under 2.6. Import md5 otherwise,
  47. # preserving 2.4 compatibility.
  48. try:
  49. import hashlib
  50. _md5 = hashlib.md5
  51. except ImportError:
  52. import md5
  53. _md5 = md5.new
  54. class NameNode(Script):
  55. def get_stack_to_component(self):
  56. return {"HDP": "hadoop-hdfs-namenode"}
  57. def get_hdfs_binary(self):
  58. """
  59. Get the name or path to the hdfs binary depending on the stack and version.
  60. """
  61. import params
  62. stack_to_comp = self.get_stack_to_component()
  63. if params.stack_name in stack_to_comp:
  64. return get_hdfs_binary(stack_to_comp[params.stack_name])
  65. return "hdfs"
  66. def install(self, env):
  67. import params
  68. self.install_packages(env, params.exclude_packages)
  69. env.set_params(params)
  70. #TODO we need this for HA because of manual steps
  71. self.configure(env)
  72. def configure(self, env):
  73. import params
  74. env.set_params(params)
  75. hdfs("namenode")
  76. hdfs_binary = self.get_hdfs_binary()
  77. namenode(action="configure", hdfs_binary=hdfs_binary, env=env)
  78. def start(self, env, upgrade_type=None):
  79. import params
  80. env.set_params(params)
  81. self.configure(env)
  82. hdfs_binary = self.get_hdfs_binary()
  83. namenode(action="start", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
  84. def stop(self, env, upgrade_type=None):
  85. import params
  86. env.set_params(params)
  87. hdfs_binary = self.get_hdfs_binary()
  88. if upgrade_type == "rolling" and params.dfs_ha_enabled:
  89. if params.dfs_ha_automatic_failover_enabled:
  90. initiate_safe_zkfc_failover()
  91. else:
  92. raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
  93. namenode(action="stop", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
  94. def status(self, env):
  95. import status_params
  96. env.set_params(status_params)
  97. namenode(action="status", env=env)
  98. def decommission(self, env):
  99. import params
  100. env.set_params(params)
  101. hdfs_binary = self.get_hdfs_binary()
  102. namenode(action="decommission", hdfs_binary=hdfs_binary)
  103. @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
  104. class NameNodeDefault(NameNode):
  105. def restore_snapshot(self, env):
  106. """
  107. Restore the snapshot during a Downgrade.
  108. """
  109. print "TODO AMBARI-12698"
  110. pass
  111. def prepare_express_upgrade(self, env):
  112. """
  113. During an Express Upgrade.
  114. If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and
  115. make sure that there is no "/previous" directory.
  116. Create a list of all the DataNodes in the cluster.
  117. hdfs dfsadmin -report > dfs-old-report-1.log
  118. hdfs dfsadmin -safemode enter
  119. hdfs dfsadmin -saveNamespace
  120. Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory.
  121. Finalize any prior HDFS upgrade,
  122. hdfs dfsadmin -finalizeUpgrade
  123. Prepare for a NameNode rolling upgrade in order to not lose any data.
  124. hdfs dfsadmin -rollingUpgrade prepare
  125. """
  126. import params
  127. Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.")
  128. if params.security_enabled:
  129. kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
  130. Execute(kinit_command, user=params.hdfs_user, logoutput=True)
  131. hdfs_binary = self.get_hdfs_binary()
  132. namenode_upgrade.prepare_upgrade_check_for_previous_dir()
  133. namenode_upgrade.prepare_upgrade_enter_safe_mode(hdfs_binary)
  134. namenode_upgrade.prepare_upgrade_save_namespace(hdfs_binary)
  135. namenode_upgrade.prepare_upgrade_backup_namenode_dir()
  136. namenode_upgrade.prepare_upgrade_finalize_previous_upgrades(hdfs_binary)
  137. # Call -rollingUpgrade prepare
  138. namenode_upgrade.prepare_rolling_upgrade(hdfs_binary)
  139. def prepare_rolling_upgrade(self, env):
  140. hfds_binary = self.get_hdfs_binary()
  141. namenode_upgrade.prepare_rolling_upgrade(hfds_binary)
  142. def wait_for_safemode_off(self, env):
  143. """
  144. During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting
  145. all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode.
  146. If HA is present, then this command will run individually on each NameNode, which checks for its own address.
  147. """
  148. import params
  149. Logger.info("Wait to leafe safemode since must transition from ON to OFF.")
  150. if params.security_enabled:
  151. kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
  152. Execute(kinit_command, user=params.hdfs_user, logoutput=True)
  153. try:
  154. hdfs_binary = self.get_hdfs_binary()
  155. # Note, this fails if namenode_address isn't prefixed with "params."
  156. dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary, use_specific_namenode=True)
  157. is_namenode_safe_mode_off = dfsadmin_base_command + " -safemode get | grep 'Safe mode is OFF'"
  158. # Wait up to 30 mins
  159. Execute(is_namenode_safe_mode_off,
  160. tries=180,
  161. try_sleep=10,
  162. user=params.hdfs_user,
  163. logoutput=True
  164. )
  165. # Wait a bit more since YARN still depends on block reports coming in.
  166. # Also saw intermittent errors with HBASE service check if it was done too soon.
  167. time.sleep(30)
  168. except Fail:
  169. Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
  170. def finalize_non_rolling_upgrade(self, env):
  171. hfds_binary = self.get_hdfs_binary()
  172. namenode_upgrade.finalize_upgrade("nonrolling", hfds_binary)
  173. def finalize_rolling_upgrade(self, env):
  174. hfds_binary = self.get_hdfs_binary()
  175. namenode_upgrade.finalize_upgrade("rolling", hfds_binary)
  176. def pre_upgrade_restart(self, env, upgrade_type=None):
  177. Logger.info("Executing Stack Upgrade pre-restart")
  178. import params
  179. env.set_params(params)
  180. if params.version and compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') >= 0:
  181. # When downgrading an Express Upgrade, the first thing we do is to revert the symlinks.
  182. # Therefore, we cannot call this code in that scenario.
  183. call_if = [("rolling", "upgrade"), ("rolling", "downgrade"), ("nonrolling", "upgrade")]
  184. for e in call_if:
  185. if (upgrade_type, params.upgrade_direction) == e:
  186. conf_select.select(params.stack_name, "hadoop", params.version)
  187. hdp_select.select("hadoop-hdfs-namenode", params.version)
  188. def post_upgrade_restart(self, env, upgrade_type=None):
  189. Logger.info("Executing Stack Upgrade post-restart")
  190. import params
  191. env.set_params(params)
  192. hdfs_binary = self.get_hdfs_binary()
  193. dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
  194. dfsadmin_cmd = dfsadmin_base_command + " -report -live"
  195. Execute(dfsadmin_cmd,
  196. user=params.hdfs_user,
  197. tries=60,
  198. try_sleep=10
  199. )
  200. def security_status(self, env):
  201. import status_params
  202. env.set_params(status_params)
  203. props_value_check = {"hadoop.security.authentication": "kerberos",
  204. "hadoop.security.authorization": "true"}
  205. props_empty_check = ["hadoop.security.auth_to_local"]
  206. props_read_check = None
  207. core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
  208. props_read_check)
  209. props_value_check = None
  210. props_empty_check = ['dfs.namenode.kerberos.internal.spnego.principal',
  211. 'dfs.namenode.keytab.file',
  212. 'dfs.namenode.kerberos.principal']
  213. props_read_check = ['dfs.namenode.keytab.file']
  214. hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
  215. props_read_check)
  216. hdfs_expectations = {}
  217. hdfs_expectations.update(core_site_expectations)
  218. hdfs_expectations.update(hdfs_site_expectations)
  219. security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
  220. {'core-site.xml': FILE_TYPE_XML,
  221. 'hdfs-site.xml': FILE_TYPE_XML})
  222. if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
  223. security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
  224. result_issues = validate_security_config_properties(security_params, hdfs_expectations)
  225. if not result_issues: # If all validations passed successfully
  226. try:
  227. # Double check the dict before calling execute
  228. if ( 'hdfs-site' not in security_params
  229. or 'dfs.namenode.keytab.file' not in security_params['hdfs-site']
  230. or 'dfs.namenode.kerberos.principal' not in security_params['hdfs-site']):
  231. self.put_structured_out({"securityState": "UNSECURED"})
  232. self.put_structured_out(
  233. {"securityIssuesFound": "Keytab file or principal are not set property."})
  234. return
  235. cached_kinit_executor(status_params.kinit_path_local,
  236. status_params.hdfs_user,
  237. security_params['hdfs-site']['dfs.namenode.keytab.file'],
  238. security_params['hdfs-site']['dfs.namenode.kerberos.principal'],
  239. status_params.hostname,
  240. status_params.tmp_dir)
  241. self.put_structured_out({"securityState": "SECURED_KERBEROS"})
  242. except Exception as e:
  243. self.put_structured_out({"securityState": "ERROR"})
  244. self.put_structured_out({"securityStateErrorInfo": str(e)})
  245. else:
  246. issues = []
  247. for cf in result_issues:
  248. issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
  249. self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
  250. self.put_structured_out({"securityState": "UNSECURED"})
  251. else:
  252. self.put_structured_out({"securityState": "UNSECURED"})
  253. def rebalancehdfs(self, env):
  254. import params
  255. env.set_params(params)
  256. name_node_parameters = json.loads( params.name_node_params )
  257. threshold = name_node_parameters['threshold']
  258. _print("Starting balancer with threshold = %s\n" % threshold)
  259. rebalance_env = {'PATH': params.hadoop_bin_dir}
  260. if params.security_enabled:
  261. # Create the kerberos credentials cache (ccache) file and set it in the environment to use
  262. # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file
  263. # to generate a (relatively) unique cache filename so that we can use it as needed.
  264. # TODO: params.tmp_dir=/var/lib/ambari-agent/tmp. However hdfs user doesn't have access to this path.
  265. # TODO: Hence using /tmp
  266. ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest()
  267. ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name)
  268. rebalance_env['KRB5CCNAME'] = ccache_file_path
  269. # If there are no tickets in the cache or they are expired, perform a kinit, else use what
  270. # is in the cache
  271. klist_cmd = format("{klist_path_local} -s {ccache_file_path}")
  272. kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}")
  273. if shell.call(klist_cmd, user=params.hdfs_user)[0] != 0:
  274. Execute(kinit_cmd, user=params.hdfs_user)
  275. def calculateCompletePercent(first, current):
  276. return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
  277. def startRebalancingProcess(threshold, rebalance_env):
  278. rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}')
  279. return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env)
  280. command = startRebalancingProcess(threshold, rebalance_env)
  281. basedir = os.path.join(env.config.basedir, 'scripts')
  282. if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD
  283. basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator')
  284. command = ['ambari-python-wrap','hdfs-command.py']
  285. _print("Executing command %s\n" % command)
  286. parser = hdfs_rebalance.HdfsParser()
  287. def handle_new_line(line, is_stderr):
  288. if is_stderr:
  289. return
  290. _print('[balancer] %s' % (line))
  291. pl = parser.parseLine(line)
  292. if pl:
  293. res = pl.toJson()
  294. res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
  295. self.put_structured_out(res)
  296. elif parser.state == 'PROCESS_FINISED' :
  297. _print('[balancer] %s' % ('Process is finished' ))
  298. self.put_structured_out({'completePercent' : 1})
  299. return
  300. Execute(command,
  301. on_new_line = handle_new_line,
  302. logoutput = False,
  303. )
  304. if params.security_enabled:
  305. # Delete the kerberos credentials cache (ccache) file
  306. File(ccache_file_path,
  307. action = "delete",
  308. )
  309. @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
  310. class NameNodeWindows(NameNode):
  311. def install(self, env):
  312. import install_params
  313. self.install_packages(env, install_params.exclude_packages)
  314. #TODO we need this for HA because of manual steps
  315. self.configure(env)
  316. def rebalancehdfs(self, env):
  317. from ambari_commons.os_windows import UserHelper, run_os_command_impersonated
  318. import params
  319. env.set_params(params)
  320. hdfs_username, hdfs_domain = UserHelper.parse_user_name(params.hdfs_user, ".")
  321. name_node_parameters = json.loads( params.name_node_params )
  322. threshold = name_node_parameters['threshold']
  323. _print("Starting balancer with threshold = %s\n" % threshold)
  324. def calculateCompletePercent(first, current):
  325. return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
  326. def startRebalancingProcess(threshold):
  327. rebalanceCommand = 'hdfs balancer -threshold %s' % threshold
  328. return ['cmd', '/C', rebalanceCommand]
  329. command = startRebalancingProcess(threshold)
  330. basedir = os.path.join(env.config.basedir, 'scripts')
  331. _print("Executing command %s\n" % command)
  332. parser = hdfs_rebalance.HdfsParser()
  333. returncode, stdout, err = run_os_command_impersonated(' '.join(command), hdfs_username, Script.get_password(params.hdfs_user), hdfs_domain)
  334. for line in stdout.split('\n'):
  335. _print('[balancer] %s %s' % (str(datetime.now()), line ))
  336. pl = parser.parseLine(line)
  337. if pl:
  338. res = pl.toJson()
  339. res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
  340. self.put_structured_out(res)
  341. elif parser.state == 'PROCESS_FINISED' :
  342. _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' ))
  343. self.put_structured_out({'completePercent' : 1})
  344. break
  345. if returncode != None and returncode != 0:
  346. raise Fail('Hdfs rebalance process exited with error. See the log output')
  347. def _print(line):
  348. sys.stdout.write(line)
  349. sys.stdout.flush()
  350. if __name__ == "__main__":
  351. NameNode().execute()