hdfs_resource.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. # !/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. Ambari Agent
  17. """
  18. import re
  19. import os
  20. import time
  21. from resource_management.core.environment import Environment
  22. from resource_management.core.base import Fail
  23. from resource_management.core.resources.system import Execute
  24. from resource_management.core.resources.system import File
  25. from resource_management.core.providers import Provider
  26. from resource_management.core.logger import Logger
  27. from resource_management.core import shell
  28. from resource_management.libraries.script import Script
  29. from resource_management.libraries.functions import format
  30. from resource_management.libraries.functions.get_user_call_output import get_user_call_output
  31. from resource_management.libraries.functions import is_empty
  32. from resource_management.libraries.functions import namenode_ha_utils
  33. from resource_management.libraries.functions.hdfs_utils import is_https_enabled_in_hdfs
  34. import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
  35. import subprocess
  36. JSON_PATH = '/var/lib/ambari-agent/tmp/hdfs_resources_{timestamp}.json'
  37. JAR_PATH = '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar'
  38. RESOURCE_TO_JSON_FIELDS = {
  39. 'target': 'target',
  40. 'type': 'type',
  41. 'action': 'action',
  42. 'source': 'source',
  43. 'owner': 'owner',
  44. 'group': 'group',
  45. 'mode': 'mode',
  46. 'recursive_chown': 'recursiveChown',
  47. 'recursive_chmod': 'recursiveChmod',
  48. 'change_permissions_for_parents': 'changePermissionforParents',
  49. 'manage_if_exists': 'manageIfExists',
  50. 'dfs_type': 'dfs_type'
  51. }
  52. class HdfsResourceJar:
  53. """
  54. This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but it works in any cases on any DFS types.
  55. The idea is to put all the files/directories/copyFromLocals we have to create/delete into a json file.
  56. And then create in it with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
  57. 'create_and_execute' and 'delete_on_execute' does nothing but add files/directories to this json,
  58. while execute does all the expensive creating/deleting work executing the jar with the json as parameter.
  59. """
  60. def action_delayed(self, action_name, main_resource):
  61. resource = {}
  62. env = Environment.get_instance()
  63. if not 'hdfs_files' in env.config:
  64. env.config['hdfs_files'] = []
  65. # Put values in dictionary-resource
  66. for field_name, json_field_name in RESOURCE_TO_JSON_FIELDS.iteritems():
  67. if field_name == 'action':
  68. resource[json_field_name] = action_name
  69. elif field_name == 'mode' and main_resource.resource.mode:
  70. resource[json_field_name] = oct(main_resource.resource.mode)[1:]
  71. elif field_name == 'manage_if_exists':
  72. resource[json_field_name] = main_resource.manage_if_exists
  73. elif getattr(main_resource.resource, field_name):
  74. resource[json_field_name] = getattr(main_resource.resource, field_name)
  75. # Add resource to create
  76. env.config['hdfs_files'].append(resource)
  77. def action_execute(self, main_resource):
  78. env = Environment.get_instance()
  79. # Check required parameters
  80. main_resource.assert_parameter_is_set('user')
  81. if not 'hdfs_files' in env.config or not env.config['hdfs_files']:
  82. Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' wasn't triggered before this 'execute' action.")
  83. return
  84. hadoop_bin_dir = main_resource.resource.hadoop_bin_dir
  85. hadoop_conf_dir = main_resource.resource.hadoop_conf_dir
  86. user = main_resource.resource.user
  87. security_enabled = main_resource.resource.security_enabled
  88. keytab_file = main_resource.resource.keytab
  89. kinit_path = main_resource.resource.kinit_path_local
  90. logoutput = main_resource.resource.logoutput
  91. principal_name = main_resource.resource.principal_name
  92. jar_path=JAR_PATH
  93. timestamp = time.time()
  94. json_path=format(JSON_PATH)
  95. if security_enabled:
  96. main_resource.kinit()
  97. # Write json file to disk
  98. File(json_path,
  99. owner = user,
  100. content = json.dumps(env.config['hdfs_files'])
  101. )
  102. # Execute jar to create/delete resources in hadoop
  103. Execute(format("hadoop --config {hadoop_conf_dir} jar {jar_path} {json_path}"),
  104. user=user,
  105. path=[hadoop_bin_dir],
  106. logoutput=logoutput,
  107. )
  108. # Clean
  109. env.config['hdfs_files'] = []
  110. class WebHDFSUtil:
  111. def __init__(self, hdfs_site, run_user, security_enabled, logoutput=None):
  112. https_nn_address = namenode_ha_utils.get_property_for_active_namenode(hdfs_site, 'dfs.namenode.https-address',
  113. security_enabled, run_user)
  114. http_nn_address = namenode_ha_utils.get_property_for_active_namenode(hdfs_site, 'dfs.namenode.http-address',
  115. security_enabled, run_user)
  116. self.is_https_enabled = is_https_enabled_in_hdfs(hdfs_site['dfs.http.policy'], hdfs_site['dfs.https.enable'])
  117. address = https_nn_address if self.is_https_enabled else http_nn_address
  118. protocol = "https" if self.is_https_enabled else "http"
  119. self.address = format("{protocol}://{address}")
  120. self.run_user = run_user
  121. self.security_enabled = security_enabled
  122. self.logoutput = logoutput
  123. @staticmethod
  124. def is_webhdfs_available(is_webhdfs_enabled, default_fs):
  125. # only hdfs seems to support webHDFS
  126. return (is_webhdfs_enabled and default_fs.startswith("hdfs"))
  127. valid_status_codes = ["200", "201"]
  128. def run_command(self, target, operation, method='POST', assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs):
  129. """
  130. assertable_result - some POST requests return '{"boolean":false}' or '{"boolean":true}'
  131. depending on if query was successful or not, we can assert this for them
  132. """
  133. target = HdfsResourceProvider.parse_path(target)
  134. url = format("{address}/webhdfs/v1{target}?op={operation}&user.name={run_user}", address=self.address, run_user=self.run_user)
  135. for k,v in kwargs.iteritems():
  136. url = format("{url}&{k}={v}")
  137. if file_to_put and not os.path.exists(file_to_put):
  138. raise Fail(format("File {file_to_put} is not found."))
  139. cmd = ["curl", "-sS","-L", "-w", "%{http_code}", "-X", method]
  140. if file_to_put:
  141. cmd += ["--data-binary", "@"+file_to_put]
  142. if self.security_enabled:
  143. cmd += ["--negotiate", "-u", ":"]
  144. if self.is_https_enabled:
  145. cmd += ["-k"]
  146. cmd.append(url)
  147. _, out, err = get_user_call_output(cmd, user=self.run_user, logoutput=self.logoutput, quiet=False)
  148. status_code = out[-3:]
  149. out = out[:-3] # remove last line from output which is status code
  150. try:
  151. result_dict = json.loads(out)
  152. except ValueError:
  153. result_dict = out
  154. if status_code not in WebHDFSUtil.valid_status_codes+ignore_status_codes or assertable_result and result_dict and not result_dict['boolean']:
  155. formatted_output = json.dumps(result_dict, indent=2) if isinstance(result_dict, dict) else result_dict
  156. formatted_output = err + "\n" + formatted_output
  157. err_msg = "Execution of '%s' returned status_code=%s. %s" % (shell.string_cmd_from_args_list(cmd), status_code, formatted_output)
  158. raise Fail(err_msg)
  159. return result_dict
  160. class HdfsResourceWebHDFS:
  161. """
  162. This is the fastest implementation of HdfsResource using WebHDFS.
  163. Since it's not available on non-hdfs FS and also can be disabled in scope of HDFS.
  164. We should still have the other implementations for such a cases.
  165. """
  166. """
  167. If we have more than this count of files to recursively chmod/chown
  168. webhdfs won't be used, but 'hadoop fs -chmod (or chown) -R ..' As it can really slow.
  169. (in one second ~17 files can be chmoded)
  170. """
  171. MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS = 1000
  172. """
  173. This is used to avoid a lot of liststatus commands, which can take some time if directory
  174. contains a lot of files. LISTSTATUS of directory with 1000 files takes ~0.5 seconds.
  175. """
  176. MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS = 250
  177. def action_execute(self, main_resource):
  178. pass
  179. def _assert_valid(self):
  180. source = self.main_resource.resource.source
  181. type = self.main_resource.resource.type
  182. target = self.main_resource.resource.target
  183. if source:
  184. if not os.path.exists(source):
  185. raise Fail(format("Source {source} doesn't exist"))
  186. if type == "directory" and os.path.isfile(source):
  187. raise Fail(format("Source {source} is file but type is {type}"))
  188. elif type == "file" and os.path.isdir(source):
  189. raise Fail(format("Source {source} is directory but type is {type}"))
  190. self.target_status = self._get_file_status(target)
  191. if self.target_status and self.target_status['type'].lower() != type:
  192. raise Fail(format("Trying to create file/directory but directory/file exists in the DFS on {target}"))
  193. def action_delayed(self, action_name, main_resource):
  194. main_resource.assert_parameter_is_set('user')
  195. if main_resource.resource.security_enabled:
  196. main_resource.kinit()
  197. self.util = WebHDFSUtil(main_resource.resource.hdfs_site, main_resource.resource.user,
  198. main_resource.resource.security_enabled, main_resource.resource.logoutput)
  199. self.mode = oct(main_resource.resource.mode)[1:] if main_resource.resource.mode else main_resource.resource.mode
  200. self.mode_set = False
  201. self.main_resource = main_resource
  202. self._assert_valid()
  203. if self.main_resource.manage_if_exists == False and self.target_status:
  204. Logger.info("Skipping the operation for not managed DFS directory " + str(self.main_resource.resource.target) +
  205. " since immutable_paths contains it.")
  206. return
  207. if action_name == "create":
  208. self._create_resource()
  209. self._set_mode(self.target_status)
  210. self._set_owner(self.target_status)
  211. else:
  212. self._delete_resource()
  213. def _create_resource(self):
  214. is_create = (self.main_resource.resource.source == None)
  215. if is_create and self.main_resource.resource.type == "directory":
  216. self._create_directory(self.main_resource.resource.target)
  217. elif is_create and self.main_resource.resource.type == "file":
  218. self._create_file(self.main_resource.target, mode=self.mode)
  219. elif not is_create and self.main_resource.resource.type == "file":
  220. self._create_file(self.main_resource.resource.target, source=self.main_resource.resource.source, mode=self.mode)
  221. elif not is_create and self.main_resource.resource.type == "directory":
  222. self._create_directory(self.main_resource.resource.target)
  223. self._copy_from_local_directory(self.main_resource.resource.target, self.main_resource.resource.source)
  224. def _copy_from_local_directory(self, target, source):
  225. for next_path_part in os.listdir(source):
  226. new_source = os.path.join(source, next_path_part)
  227. new_target = format("{target}/{next_path_part}")
  228. if os.path.isdir(new_source):
  229. Logger.info(format("Creating DFS directory {new_target}"))
  230. self._create_directory(new_target)
  231. self._copy_from_local_directory(new_target, new_source)
  232. else:
  233. self._create_file(new_target, new_source)
  234. def _create_directory(self, target):
  235. if target == self.main_resource.resource.target and self.target_status:
  236. return
  237. self.util.run_command(target, 'MKDIRS', method='PUT')
  238. def _get_file_status(self, target):
  239. list_status = self.util.run_command(target, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
  240. return list_status['FileStatus'] if 'FileStatus' in list_status else None
  241. def _create_file(self, target, source=None, mode=""):
  242. """
  243. PUT file command in slow, however _get_file_status is pretty fast,
  244. so we should check if the file really should be put before doing it.
  245. """
  246. file_status = self._get_file_status(target) if target!=self.main_resource.resource.target else self.target_status
  247. mode = "" if not mode else mode
  248. if file_status:
  249. if source:
  250. length = file_status['length']
  251. local_file_size = os.stat(source).st_size # TODO: os -> sudo
  252. # TODO: re-implement this using checksums
  253. if local_file_size == length:
  254. Logger.info(format("DFS file {target} is identical to {source}, skipping the copying"))
  255. return
  256. elif not self.main_resource.resource.replace_existing_files:
  257. Logger.info(format("Not replacing existing DFS file {target} which is different from {source}, due to replace_existing_files=False"))
  258. return
  259. else:
  260. Logger.info(format("File {target} already exists in DFS, skipping the creation"))
  261. return
  262. Logger.info(format("Creating new file {target} in DFS"))
  263. kwargs = {'permission': mode} if mode else {}
  264. self.util.run_command(target, 'CREATE', method='PUT', overwrite=True, assertable_result=False, file_to_put=source, **kwargs)
  265. if mode and file_status:
  266. file_status['permission'] = mode
  267. def _delete_resource(self):
  268. if not self.target_status:
  269. return
  270. self.util.run_command(self.main_resource.resource.target, 'DELETE', method='DELETE', recursive=True)
  271. def _set_owner(self, file_status=None):
  272. owner = "" if not self.main_resource.resource.owner else self.main_resource.resource.owner
  273. group = "" if not self.main_resource.resource.group else self.main_resource.resource.group
  274. if (not owner or file_status and file_status['owner'] == owner) and (not group or file_status and file_status['group'] == group):
  275. return
  276. self.util.run_command(self.main_resource.resource.target, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
  277. results = []
  278. if self.main_resource.resource.recursive_chown:
  279. content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
  280. if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
  281. self._fill_directories_list(self.main_resource.resource.target, results)
  282. else: # avoid chmowning a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
  283. shell.checked_call(["hadoop", "fs", "-chown", "-R", format("{owner}:{group}"), self.main_resource.resource.target], user=self.main_resource.resource.user)
  284. if self.main_resource.resource.change_permissions_for_parents:
  285. self._fill_in_parent_directories(self.main_resource.resource.target, results)
  286. for path in results:
  287. self.util.run_command(path, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
  288. def _set_mode(self, file_status=None):
  289. if not self.mode or file_status and file_status['permission'] == self.mode:
  290. return
  291. if not self.mode_set:
  292. self.util.run_command(self.main_resource.resource.target, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
  293. results = []
  294. if self.main_resource.resource.recursive_chmod:
  295. content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
  296. if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
  297. self._fill_directories_list(self.main_resource.resource.target, results)
  298. else: # avoid chmoding a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
  299. shell.checked_call(["hadoop", "fs", "-chmod", "-R", self.mode, self.main_resource.resource.target], user=self.main_resource.resource.user)
  300. if self.main_resource.resource.change_permissions_for_parents:
  301. self._fill_in_parent_directories(self.main_resource.resource.target, results)
  302. for path in results:
  303. self.util.run_command(path, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
  304. def _fill_in_parent_directories(self, target, results):
  305. path_parts = HdfsResourceProvider.parse_path(target).split("/")[1:]# [1:] remove '' from parts
  306. path = "/"
  307. for path_part in path_parts:
  308. path += path_part + "/"
  309. results.append(path)
  310. def _fill_directories_list(self, target, results):
  311. list_status = self.util.run_command(target, 'LISTSTATUS', method='GET', assertable_result=False)['FileStatuses']['FileStatus']
  312. for file in list_status:
  313. if file['pathSuffix']:
  314. new_path = target + "/" + file['pathSuffix']
  315. results.append(new_path)
  316. if file['type'] == 'DIRECTORY':
  317. self._fill_directories_list(new_path, results)
  318. class HdfsResourceProvider(Provider):
  319. def __init__(self, resource):
  320. super(HdfsResourceProvider,self).__init__(resource)
  321. self.fsType = getattr(resource, 'dfs_type')
  322. self.ignored_resources_list = HdfsResourceProvider.get_ignored_resources_list(self.resource.hdfs_resource_ignore_file)
  323. if self.fsType != 'HCFS':
  324. self.assert_parameter_is_set('hdfs_site')
  325. self.webhdfs_enabled = self.resource.hdfs_site['dfs.webhdfs.enabled']
  326. @staticmethod
  327. def parse_path(path):
  328. """
  329. hdfs://nn_url:1234/a/b/c -> /a/b/c
  330. hdfs://nn_ha_name/a/b/c -> /a/b/c
  331. hdfs:///a/b/c -> /a/b/c
  332. /a/b/c -> /a/b/c
  333. """
  334. math_with_protocol_and_nn_url = re.match("[a-zA-Z]+://[^/]+(/.+)", path)
  335. math_with_protocol = re.match("[a-zA-Z]+://(/.+)", path)
  336. if math_with_protocol_and_nn_url:
  337. path = math_with_protocol_and_nn_url.group(1)
  338. elif math_with_protocol:
  339. path = math_with_protocol.group(1)
  340. else:
  341. path = path
  342. return re.sub("[/]+", "/", path)
  343. @staticmethod
  344. def get_ignored_resources_list(hdfs_resource_ignore_file):
  345. if not hdfs_resource_ignore_file or not os.path.exists(hdfs_resource_ignore_file):
  346. return []
  347. with open(hdfs_resource_ignore_file, "rb") as fp:
  348. content = fp.read()
  349. hdfs_resources_to_ignore = []
  350. for hdfs_resource_to_ignore in content.split("\n"):
  351. hdfs_resources_to_ignore.append(HdfsResourceProvider.parse_path(hdfs_resource_to_ignore))
  352. return hdfs_resources_to_ignore
  353. def action_delayed(self, action_name):
  354. self.assert_parameter_is_set('type')
  355. parsed_path = HdfsResourceProvider.parse_path(self.resource.target)
  356. parsed_not_managed_paths = [HdfsResourceProvider.parse_path(path) for path in self.resource.immutable_paths]
  357. self.manage_if_exists = not parsed_path in parsed_not_managed_paths
  358. if parsed_path in self.ignored_resources_list:
  359. Logger.info("Skipping '{0}' because it is in ignore file {1}.".format(self.resource, self.resource.hdfs_resource_ignore_file))
  360. return
  361. self.get_hdfs_resource_executor().action_delayed(action_name, self)
  362. def action_create_on_execute(self):
  363. self.action_delayed("create")
  364. def action_delete_on_execute(self):
  365. self.action_delayed("delete")
  366. def action_execute(self):
  367. self.get_hdfs_resource_executor().action_execute(self)
  368. def get_hdfs_resource_executor(self):
  369. if self.fsType == 'HCFS':
  370. return HdfsResourceJar()
  371. elif WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.resource.default_fs):
  372. return HdfsResourceWebHDFS()
  373. else:
  374. return HdfsResourceJar()
  375. def assert_parameter_is_set(self, parameter_name):
  376. if not getattr(self.resource, parameter_name):
  377. raise Fail("Resource parameter '{0}' is not set.".format(parameter_name))
  378. return True
  379. def kinit(self):
  380. keytab_file = self.resource.keytab
  381. kinit_path = self.resource.kinit_path_local
  382. principal_name = self.resource.principal_name
  383. user = self.resource.user
  384. Execute(format("{kinit_path} -kt {keytab_file} {principal_name}"),
  385. user=user
  386. )