hdfs_resource.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. # !/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. Ambari Agent
  17. """
  18. import re
  19. import os
  20. import grp
  21. import pwd
  22. import time
  23. from resource_management.core.environment import Environment
  24. from resource_management.core.base import Fail
  25. from resource_management.core.resources.system import Execute
  26. from resource_management.core.resources.system import File
  27. from resource_management.core.providers import Provider
  28. from resource_management.core.logger import Logger
  29. from resource_management.core import shell
  30. from resource_management.core import sudo
  31. from resource_management.libraries.script import Script
  32. from resource_management.libraries.functions import format
  33. from resource_management.libraries.functions.get_user_call_output import get_user_call_output
  34. from resource_management.libraries.functions import is_empty
  35. from resource_management.libraries.functions import namenode_ha_utils
  36. from resource_management.libraries.functions.hdfs_utils import is_https_enabled_in_hdfs
  37. import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
  38. import subprocess
  39. JSON_PATH = '/var/lib/ambari-agent/tmp/hdfs_resources_{timestamp}.json'
  40. JAR_PATH = '/var/lib/ambari-agent/lib/fast-hdfs-resource.jar'
  41. RESOURCE_TO_JSON_FIELDS = {
  42. 'target': 'target',
  43. 'type': 'type',
  44. 'action': 'action',
  45. 'source': 'source',
  46. 'owner': 'owner',
  47. 'group': 'group',
  48. 'mode': 'mode',
  49. 'recursive_chown': 'recursiveChown',
  50. 'recursive_chmod': 'recursiveChmod',
  51. 'change_permissions_for_parents': 'changePermissionforParents',
  52. 'manage_if_exists': 'manageIfExists',
  53. 'dfs_type': 'dfs_type'
  54. }
  55. class HdfsResourceJar:
  56. """
  57. This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but it works in any cases on any DFS types.
  58. The idea is to put all the files/directories/copyFromLocals/copyToLocals we have to create/delete into a json file.
  59. And then perform them with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json.
  60. 'create_and_execute', 'delete_on_execute' and "download_on_execute do nothing except add actions to this json,
  61. while execute does all the expensive creating/deleting work executing the jar with the json as parameter.
  62. """
  63. def action_delayed(self, action_name, main_resource):
  64. resource = {}
  65. env = Environment.get_instance()
  66. if not 'hdfs_files' in env.config:
  67. env.config['hdfs_files'] = []
  68. # Put values in dictionary-resource
  69. for field_name, json_field_name in RESOURCE_TO_JSON_FIELDS.iteritems():
  70. if field_name == 'action':
  71. resource[json_field_name] = action_name
  72. elif field_name == 'mode' and main_resource.resource.mode:
  73. resource[json_field_name] = oct(main_resource.resource.mode)[1:]
  74. elif field_name == 'manage_if_exists':
  75. resource[json_field_name] = main_resource.manage_if_exists
  76. elif getattr(main_resource.resource, field_name):
  77. resource[json_field_name] = getattr(main_resource.resource, field_name)
  78. # Add resource to create
  79. env.config['hdfs_files'].append(resource)
  80. def action_execute(self, main_resource):
  81. env = Environment.get_instance()
  82. # Check required parameters
  83. main_resource.assert_parameter_is_set('user')
  84. if not 'hdfs_files' in env.config or not env.config['hdfs_files']:
  85. Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' or 'download_on_execute' wasn't triggered before this 'execute' action.")
  86. return
  87. hadoop_bin_dir = main_resource.resource.hadoop_bin_dir
  88. hadoop_conf_dir = main_resource.resource.hadoop_conf_dir
  89. user = main_resource.resource.user
  90. security_enabled = main_resource.resource.security_enabled
  91. keytab_file = main_resource.resource.keytab
  92. kinit_path = main_resource.resource.kinit_path_local
  93. logoutput = main_resource.resource.logoutput
  94. principal_name = main_resource.resource.principal_name
  95. jar_path=JAR_PATH
  96. timestamp = time.time()
  97. json_path=format(JSON_PATH)
  98. if security_enabled:
  99. main_resource.kinit()
  100. # Write json file to disk
  101. File(json_path,
  102. owner = user,
  103. content = json.dumps(env.config['hdfs_files'])
  104. )
  105. # Execute jar to create/delete resources in hadoop
  106. Execute(format("hadoop --config {hadoop_conf_dir} jar {jar_path} {json_path}"),
  107. user=user,
  108. path=[hadoop_bin_dir],
  109. logoutput=logoutput,
  110. )
  111. # Clean
  112. env.config['hdfs_files'] = []
  113. class WebHDFSUtil:
  114. def __init__(self, hdfs_site, run_user, security_enabled, logoutput=None):
  115. https_nn_address = namenode_ha_utils.get_property_for_active_namenode(hdfs_site, 'dfs.namenode.https-address',
  116. security_enabled, run_user)
  117. http_nn_address = namenode_ha_utils.get_property_for_active_namenode(hdfs_site, 'dfs.namenode.http-address',
  118. security_enabled, run_user)
  119. self.is_https_enabled = is_https_enabled_in_hdfs(hdfs_site['dfs.http.policy'], hdfs_site['dfs.https.enable'])
  120. address = https_nn_address if self.is_https_enabled else http_nn_address
  121. protocol = "https" if self.is_https_enabled else "http"
  122. self.address = format("{protocol}://{address}")
  123. self.run_user = run_user
  124. self.security_enabled = security_enabled
  125. self.logoutput = logoutput
  126. @staticmethod
  127. def is_webhdfs_available(is_webhdfs_enabled, default_fs):
  128. # only hdfs seems to support webHDFS
  129. return (is_webhdfs_enabled and default_fs.startswith("hdfs"))
  130. valid_status_codes = ["200", "201"]
  131. def run_command(self, target, operation, method='POST', assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs):
  132. """
  133. assertable_result - some POST requests return '{"boolean":false}' or '{"boolean":true}'
  134. depending on if query was successful or not, we can assert this for them
  135. """
  136. target = HdfsResourceProvider.parse_path(target)
  137. url = format("{address}/webhdfs/v1{target}?op={operation}&user.name={run_user}", address=self.address, run_user=self.run_user)
  138. for k,v in kwargs.iteritems():
  139. url = format("{url}&{k}={v}")
  140. cmd = ["curl", "-sS","-L", "-w", "%{http_code}", "-X", method]
  141. # When operation is "OPEN" the target is actually the DFS file to download and the file_to_put is actually the target see _download_file
  142. if operation == "OPEN":
  143. cmd += ["-o", file_to_put]
  144. else:
  145. if file_to_put and not os.path.exists(file_to_put):
  146. raise Fail(format("File {file_to_put} is not found."))
  147. if file_to_put:
  148. cmd += ["--data-binary", "@"+file_to_put, "-H", "Content-Type: application/octet-stream"]
  149. if self.security_enabled:
  150. cmd += ["--negotiate", "-u", ":"]
  151. if self.is_https_enabled:
  152. cmd += ["-k"]
  153. cmd.append(url)
  154. _, out, err = get_user_call_output(cmd, user=self.run_user, logoutput=self.logoutput, quiet=False)
  155. status_code = out[-3:]
  156. out = out[:-3] # remove last line from output which is status code
  157. try:
  158. result_dict = json.loads(out)
  159. except ValueError:
  160. result_dict = out
  161. if status_code not in WebHDFSUtil.valid_status_codes+ignore_status_codes or assertable_result and result_dict and not result_dict['boolean']:
  162. formatted_output = json.dumps(result_dict, indent=2) if isinstance(result_dict, dict) else result_dict
  163. formatted_output = err + "\n" + formatted_output
  164. err_msg = "Execution of '%s' returned status_code=%s. %s" % (shell.string_cmd_from_args_list(cmd), status_code, formatted_output)
  165. raise Fail(err_msg)
  166. return result_dict
  167. class HdfsResourceWebHDFS:
  168. """
  169. This is the fastest implementation of HdfsResource using WebHDFS.
  170. Since it's not available on non-hdfs FS and also can be disabled in scope of HDFS.
  171. We should still have the other implementations for such a cases.
  172. """
  173. """
  174. If we have more than this count of files to recursively chmod/chown
  175. webhdfs won't be used, but 'hadoop fs -chmod (or chown) -R ..' As it can really slow.
  176. (in one second ~17 files can be chmoded)
  177. """
  178. MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS = 1000
  179. """
  180. This is used to avoid a lot of liststatus commands, which can take some time if directory
  181. contains a lot of files. LISTSTATUS of directory with 1000 files takes ~0.5 seconds.
  182. """
  183. MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS = 250
  184. def action_execute(self, main_resource):
  185. pass
  186. def _assert_valid(self):
  187. source = self.main_resource.resource.source
  188. type = self.main_resource.resource.type
  189. target = self.main_resource.resource.target
  190. if source:
  191. if not os.path.exists(source):
  192. raise Fail(format("Source {source} doesn't exist"))
  193. if type == "directory" and os.path.isfile(source):
  194. raise Fail(format("Source {source} is file but type is {type}"))
  195. elif type == "file" and os.path.isdir(source):
  196. raise Fail(format("Source {source} is directory but type is {type}"))
  197. self.target_status = self._get_file_status(target)
  198. if self.target_status and self.target_status['type'].lower() != type:
  199. raise Fail(format("Trying to create file/directory but directory/file exists in the DFS on {target}"))
  200. def _assert_download_valid(self):
  201. source = self.main_resource.resource.source
  202. type = self.main_resource.resource.type
  203. target = self.main_resource.resource.target
  204. if source:
  205. self.source_status = self._get_file_status(source)
  206. if self.source_status == None:
  207. raise Fail(format("Source {source} doesn't exist"))
  208. if type == "directory" and self.source_status['type'] == "FILE":
  209. raise Fail(format("Source {source} is file but type is {type}"))
  210. elif type == "file" and self.source_status['type'] == "DIRECTORY":
  211. raise Fail(format("Source {source} is directory but type is {type}"))
  212. else:
  213. raise Fail(format("No source provided"))
  214. if os.path.exists(target):
  215. if type == "directory" and os.path.isfile(target):
  216. raise Fail(format("Trying to download directory but file exists locally {target}"))
  217. elif type == "file" and os.path.isdir(target):
  218. raise Fail(format("Trying to download file but directory exists locally {target}"))
  219. def action_delayed(self, action_name, main_resource):
  220. main_resource.assert_parameter_is_set('user')
  221. if main_resource.resource.security_enabled:
  222. main_resource.kinit()
  223. self.util = WebHDFSUtil(main_resource.resource.hdfs_site, main_resource.resource.user,
  224. main_resource.resource.security_enabled, main_resource.resource.logoutput)
  225. self.mode = oct(main_resource.resource.mode)[1:] if main_resource.resource.mode else main_resource.resource.mode
  226. self.mode_set = False
  227. self.main_resource = main_resource
  228. if action_name == "download":
  229. self._assert_download_valid()
  230. else:
  231. self._assert_valid()
  232. if self.main_resource.manage_if_exists == False and self.target_status:
  233. Logger.info("Skipping the operation for not managed DFS directory " + str(self.main_resource.resource.target) +
  234. " since immutable_paths contains it.")
  235. return
  236. if action_name == "create":
  237. self._create_resource()
  238. self._set_mode(self.target_status)
  239. self._set_owner(self.target_status)
  240. elif action_name == "download":
  241. self._download_resource()
  242. else:
  243. self._delete_resource()
  244. def _create_resource(self):
  245. is_create = (self.main_resource.resource.source == None)
  246. if is_create and self.main_resource.resource.type == "directory":
  247. self._create_directory(self.main_resource.resource.target)
  248. elif is_create and self.main_resource.resource.type == "file":
  249. self._create_file(self.main_resource.target, mode=self.mode)
  250. elif not is_create and self.main_resource.resource.type == "file":
  251. self._create_file(self.main_resource.resource.target, source=self.main_resource.resource.source, mode=self.mode)
  252. elif not is_create and self.main_resource.resource.type == "directory":
  253. self._create_directory(self.main_resource.resource.target)
  254. self._copy_from_local_directory(self.main_resource.resource.target, self.main_resource.resource.source)
  255. def _copy_from_local_directory(self, target, source):
  256. for next_path_part in sudo.listdir(source):
  257. new_source = os.path.join(source, next_path_part)
  258. new_target = format("{target}/{next_path_part}")
  259. if sudo.path_isdir(new_source):
  260. Logger.info(format("Creating DFS directory {new_target}"))
  261. self._create_directory(new_target)
  262. self._copy_from_local_directory(new_target, new_source)
  263. else:
  264. self._create_file(new_target, new_source)
  265. def _download_resource(self):
  266. if self.main_resource.resource.source == None:
  267. return
  268. if self.main_resource.resource.type == "file":
  269. self._download_file(self.main_resource.resource.target, self.main_resource.resource.source, self.source_status)
  270. elif self.main_resource.resource.type == "directory":
  271. self._download_directory(self.main_resource.resource.target, self.main_resource.resource.source)
  272. def _download_directory(self, target, source):
  273. self._create_local_directory(target)
  274. for file_status in self._list_directory(source):
  275. if not file_status == None:
  276. next_path_part = file_status['pathSuffix']
  277. new_source = format("{source}/{next_path_part}")
  278. new_target = os.path.join(target, next_path_part)
  279. if file_status['type'] == "DIRECTORY":
  280. self._download_directory(new_target, new_source)
  281. else:
  282. self._download_file(new_target, new_source, file_status)
  283. def _create_local_directory(self, target):
  284. if not os.path.exists(target):
  285. Logger.info(format("Creating local directory {target}"))
  286. sudo.makedir(target, "")
  287. owner_name = "" if not self.main_resource.resource.owner else self.main_resource.resource.owner
  288. group_name = "" if not self.main_resource.resource.group else self.main_resource.resource.group
  289. owner = pwd.getpwnam(owner_name)
  290. group = grp.getgrnam(group_name)
  291. sudo.chown(target, owner, group)
  292. def _download_file(self, target, source, file_status):
  293. """
  294. PUT file command is slow, however _get_file_status is pretty fast,
  295. so we should check if the file really should be put before doing it.
  296. """
  297. if file_status and os.path.exists(target):
  298. length = file_status['length']
  299. local_file_size = os.stat(target).st_size # TODO: os -> sudo
  300. # TODO: re-implement this using checksums
  301. if local_file_size == length:
  302. Logger.info(format("DFS file {source} is identical to {target}, skipping the download"))
  303. return
  304. elif not self.main_resource.resource.replace_existing_files:
  305. Logger.info(format("Not replacing existing local file {target} which is different from DFS file {source}, due to replace_existing_files=False"))
  306. return
  307. kwargs = {}
  308. self.util.run_command(source, 'OPEN', method='GET', overwrite=True, assertable_result=False, file_to_put=target, **kwargs)
  309. def _create_directory(self, target):
  310. if target == self.main_resource.resource.target and self.target_status:
  311. return
  312. self.util.run_command(target, 'MKDIRS', method='PUT')
  313. def _get_file_status(self, target):
  314. list_status = self.util.run_command(target, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
  315. return list_status['FileStatus'] if 'FileStatus' in list_status else None
  316. def _list_directory(self, target):
  317. results = self.util.run_command(target, 'LISTSTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False)
  318. entry = results['FileStatuses'] if 'FileStatuses' in results else None
  319. if entry == None:
  320. return []
  321. return entry['FileStatus'] if 'FileStatus' in entry else []
  322. def _create_file(self, target, source=None, mode=""):
  323. """
  324. PUT file command in slow, however _get_file_status is pretty fast,
  325. so we should check if the file really should be put before doing it.
  326. """
  327. file_status = self._get_file_status(target) if target!=self.main_resource.resource.target else self.target_status
  328. mode = "" if not mode else mode
  329. if file_status:
  330. if source:
  331. length = file_status['length']
  332. local_file_size = os.stat(source).st_size # TODO: os -> sudo
  333. # TODO: re-implement this using checksums
  334. if local_file_size == length:
  335. Logger.info(format("DFS file {target} is identical to {source}, skipping the copying"))
  336. return
  337. elif not self.main_resource.resource.replace_existing_files:
  338. Logger.info(format("Not replacing existing DFS file {target} which is different from {source}, due to replace_existing_files=False"))
  339. return
  340. else:
  341. Logger.info(format("File {target} already exists in DFS, skipping the creation"))
  342. return
  343. Logger.info(format("Creating new file {target} in DFS"))
  344. kwargs = {'permission': mode} if mode else {}
  345. self.util.run_command(target, 'CREATE', method='PUT', overwrite=True, assertable_result=False, file_to_put=source, **kwargs)
  346. if mode and file_status:
  347. file_status['permission'] = mode
  348. def _delete_resource(self):
  349. if not self.target_status:
  350. return
  351. self.util.run_command(self.main_resource.resource.target, 'DELETE', method='DELETE', recursive=True)
  352. def _set_owner(self, file_status=None):
  353. owner = "" if not self.main_resource.resource.owner else self.main_resource.resource.owner
  354. group = "" if not self.main_resource.resource.group else self.main_resource.resource.group
  355. if not self.main_resource.resource.recursive_chown and (not owner or file_status and file_status['owner'] == owner) and (not group or file_status and file_status['group'] == group):
  356. return
  357. self.util.run_command(self.main_resource.resource.target, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
  358. results = []
  359. if self.main_resource.resource.recursive_chown:
  360. content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
  361. if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
  362. self._fill_directories_list(self.main_resource.resource.target, results)
  363. else: # avoid chmowning a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
  364. shell.checked_call(["hadoop", "fs", "-chown", "-R", format("{owner}:{group}"), self.main_resource.resource.target], user=self.main_resource.resource.user)
  365. if self.main_resource.resource.change_permissions_for_parents:
  366. self._fill_in_parent_directories(self.main_resource.resource.target, results)
  367. for path in results:
  368. self.util.run_command(path, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False)
  369. def _set_mode(self, file_status=None):
  370. if not self.mode or file_status and file_status['permission'] == self.mode:
  371. return
  372. if not self.mode_set:
  373. self.util.run_command(self.main_resource.resource.target, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
  374. results = []
  375. if self.main_resource.resource.recursive_chmod:
  376. content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False)
  377. if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
  378. self._fill_directories_list(self.main_resource.resource.target, results)
  379. else: # avoid chmoding a lot of files and listing a lot dirs via webhdfs which can take a lot of time.
  380. shell.checked_call(["hadoop", "fs", "-chmod", "-R", self.mode, self.main_resource.resource.target], user=self.main_resource.resource.user)
  381. if self.main_resource.resource.change_permissions_for_parents:
  382. self._fill_in_parent_directories(self.main_resource.resource.target, results)
  383. for path in results:
  384. self.util.run_command(path, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
  385. def _fill_in_parent_directories(self, target, results):
  386. path_parts = HdfsResourceProvider.parse_path(target).split("/")[1:]# [1:] remove '' from parts
  387. path = "/"
  388. for path_part in path_parts:
  389. path += path_part + "/"
  390. results.append(path)
  391. def _fill_directories_list(self, target, results):
  392. list_status = self.util.run_command(target, 'LISTSTATUS', method='GET', assertable_result=False)['FileStatuses']['FileStatus']
  393. for file in list_status:
  394. if file['pathSuffix']:
  395. new_path = target + "/" + file['pathSuffix']
  396. results.append(new_path)
  397. if file['type'] == 'DIRECTORY':
  398. self._fill_directories_list(new_path, results)
  399. class HdfsResourceProvider(Provider):
  400. def __init__(self, resource):
  401. super(HdfsResourceProvider,self).__init__(resource)
  402. self.fsType = getattr(resource, 'dfs_type')
  403. self.ignored_resources_list = HdfsResourceProvider.get_ignored_resources_list(self.resource.hdfs_resource_ignore_file)
  404. if self.fsType != 'HCFS':
  405. self.assert_parameter_is_set('hdfs_site')
  406. self.webhdfs_enabled = self.resource.hdfs_site['dfs.webhdfs.enabled']
  407. @staticmethod
  408. def parse_path(path):
  409. """
  410. hdfs://nn_url:1234/a/b/c -> /a/b/c
  411. hdfs://nn_ha_name/a/b/c -> /a/b/c
  412. hdfs:///a/b/c -> /a/b/c
  413. /a/b/c -> /a/b/c
  414. """
  415. math_with_protocol_and_nn_url = re.match("[a-zA-Z]+://[^/]+(/.+)", path)
  416. math_with_protocol = re.match("[a-zA-Z]+://(/.+)", path)
  417. if math_with_protocol_and_nn_url:
  418. path = math_with_protocol_and_nn_url.group(1)
  419. elif math_with_protocol:
  420. path = math_with_protocol.group(1)
  421. else:
  422. path = path
  423. return re.sub("[/]+", "/", path)
  424. @staticmethod
  425. def get_ignored_resources_list(hdfs_resource_ignore_file):
  426. if not hdfs_resource_ignore_file or not os.path.exists(hdfs_resource_ignore_file):
  427. return []
  428. with open(hdfs_resource_ignore_file, "rb") as fp:
  429. content = fp.read()
  430. hdfs_resources_to_ignore = []
  431. for hdfs_resource_to_ignore in content.split("\n"):
  432. hdfs_resources_to_ignore.append(HdfsResourceProvider.parse_path(hdfs_resource_to_ignore))
  433. return hdfs_resources_to_ignore
  434. def action_delayed(self, action_name):
  435. self.assert_parameter_is_set('type')
  436. parsed_path = HdfsResourceProvider.parse_path(self.resource.target)
  437. parsed_not_managed_paths = [HdfsResourceProvider.parse_path(path) for path in self.resource.immutable_paths]
  438. self.manage_if_exists = not parsed_path in parsed_not_managed_paths
  439. if parsed_path in self.ignored_resources_list:
  440. Logger.info("Skipping '{0}' because it is in ignore file {1}.".format(self.resource, self.resource.hdfs_resource_ignore_file))
  441. return
  442. self.get_hdfs_resource_executor().action_delayed(action_name, self)
  443. def action_create_on_execute(self):
  444. self.action_delayed("create")
  445. def action_delete_on_execute(self):
  446. self.action_delayed("delete")
  447. def action_download_on_execute(self):
  448. self.action_delayed("download")
  449. def action_execute(self):
  450. self.get_hdfs_resource_executor().action_execute(self)
  451. def get_hdfs_resource_executor(self):
  452. if self.fsType == 'HCFS':
  453. return HdfsResourceJar()
  454. elif WebHDFSUtil.is_webhdfs_available(self.webhdfs_enabled, self.resource.default_fs):
  455. return HdfsResourceWebHDFS()
  456. else:
  457. return HdfsResourceJar()
  458. def assert_parameter_is_set(self, parameter_name):
  459. if not getattr(self.resource, parameter_name):
  460. raise Fail("Resource parameter '{0}' is not set.".format(parameter_name))
  461. return True
  462. def kinit(self):
  463. keytab_file = self.resource.keytab
  464. kinit_path = self.resource.kinit_path_local
  465. principal_name = self.resource.principal_name
  466. user = self.resource.user
  467. Execute(format("{kinit_path} -kt {keytab_file} {principal_name}"),
  468. user=user
  469. )