ranger_functions.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. #!/usr/bin/env python
  2. """
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. """
  17. import re
  18. import time
  19. import sys
  20. import urllib2
  21. import base64
  22. import httplib
  23. # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
  24. import ambari_simplejson as json
  25. from StringIO import StringIO as BytesIO
  26. from resource_management.core.logger import Logger
  27. from ambari_commons.inet_utils import openurl
  28. from ambari_commons.exceptions import TimeoutError
  29. from resource_management.core.exceptions import Fail
  30. from resource_management.libraries.functions.format import format
  31. from resource_management.libraries.functions.decorator import safe_retry
  32. class Rangeradmin:
  33. sInstance = None
  34. def __init__(self, url='http://localhost:6080', skip_if_rangeradmin_down = True):
  35. self.baseUrl = url
  36. self.urlLogin = self.baseUrl + '/login.jsp'
  37. self.urlLoginPost = self.baseUrl + '/j_spring_security_check'
  38. self.urlRepos = self.baseUrl + '/service/assets/assets'
  39. self.urlReposPub = self.baseUrl + '/service/public/api/repository'
  40. self.urlPolicies = self.baseUrl + '/service/public/api/policy'
  41. self.urlGroups = self.baseUrl + '/service/xusers/groups'
  42. self.urlUsers = self.baseUrl + '/service/xusers/users'
  43. self.urlSecUsers = self.baseUrl + '/service/xusers/secure/users'
  44. self.skip_if_rangeradmin_down = skip_if_rangeradmin_down
  45. if self.skip_if_rangeradmin_down:
  46. Logger.info("Rangeradmin: Skip ranger admin if it's down !")
  47. @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
  48. def get_repository_by_name_urllib2(self, name, component, status, usernamepassword):
  49. """
  50. :param name: name of the component, from which, function will search in list of repositories
  51. :param component: component for which repository has to be checked
  52. :param status: active or inactive
  53. :param usernamepassword: user credentials using which repository needs to be searched
  54. :return Returns Ranger repository dict if found otherwise None
  55. """
  56. try:
  57. searchRepoURL = self.urlReposPub + "?name=" + name + "&type=" + component + "&status=" + status
  58. request = urllib2.Request(searchRepoURL)
  59. base64string = base64.encodestring(usernamepassword).replace('\n', '')
  60. request.add_header("Content-Type", "application/json")
  61. request.add_header("Accept", "application/json")
  62. request.add_header("Authorization", "Basic {0}".format(base64string))
  63. result = openurl(request, timeout=20)
  64. response_code = result.getcode()
  65. response = json.loads(result.read())
  66. if response_code == 200 and len(response['vXRepositories']) > 0:
  67. for repo in response['vXRepositories']:
  68. repoDump = json.loads(json.JSONEncoder().encode(repo))
  69. if repoDump['name'].lower() == name.lower():
  70. return repoDump
  71. return None
  72. else:
  73. return None
  74. except urllib2.URLError, e:
  75. if isinstance(e, urllib2.HTTPError):
  76. raise Fail("Error getting {0} repository for component {1}. Http status code - {2}. \n {3}".format(name, component, e.code, e.read()))
  77. else:
  78. raise Fail("Error getting {0} repository for component {1}. Reason - {2}.".format(name, component, e.reason))
  79. except httplib.BadStatusLine:
  80. raise Fail("Ranger Admin service is not reachable, please restart the service and then try again")
  81. except TimeoutError:
  82. raise Fail("Connection to Ranger Admin failed. Reason - timeout")
  83. def create_ranger_repository(self, component, repo_name, repo_properties,
  84. ambari_ranger_admin, ambari_ranger_password,
  85. admin_uname, admin_password, policy_user):
  86. """
  87. :param component: name of the component, from which it will get or create repository
  88. :param repo_name: name of the repository to be get or create
  89. :param repo_properties: dict of repository to be create if not exist
  90. :param ambari_ranger_admin: ambari admin user creation username
  91. :param ambari_ranger_password: ambari admin user creation password
  92. :param admin_uname: ranger admin username
  93. :param admin_password: ranger admin password
  94. :param policy_user: use this policy user for policies that will be used during repository creation
  95. """
  96. response_code = self.check_ranger_login_urllib2(self.baseUrl)
  97. repo_data = json.dumps(repo_properties)
  98. ambari_ranger_password = unicode(ambari_ranger_password)
  99. admin_password = unicode(admin_password)
  100. ambari_username_password_for_ranger = format('{ambari_ranger_admin}:{ambari_ranger_password}')
  101. if response_code is not None and response_code == 200:
  102. user_resp_code = self.create_ambari_admin_user(ambari_ranger_admin, ambari_ranger_password, format("{admin_uname}:{admin_password}"))
  103. if user_resp_code is not None and user_resp_code == 200:
  104. retryCount = 0
  105. while retryCount <= 5:
  106. repo = self.get_repository_by_name_urllib2(repo_name, component, 'true', ambari_username_password_for_ranger)
  107. if repo is not None:
  108. Logger.info('{0} Repository {1} exist'.format(component.title(), repo['name']))
  109. break
  110. else:
  111. response = self.create_repository_urllib2(repo_data, ambari_username_password_for_ranger, policy_user)
  112. if response is not None:
  113. Logger.info('{0} Repository created in Ranger admin'.format(component.title()))
  114. break
  115. else:
  116. if retryCount < 5:
  117. Logger.info("Retry Repository Creation is being called")
  118. time.sleep(15) # delay for 15 seconds
  119. retryCount += 1
  120. else:
  121. Logger.error('{0} Repository creation failed in Ranger admin'.format(component.title()))
  122. break
  123. else:
  124. Logger.error('Ambari admin user creation failed')
  125. elif not self.skip_if_rangeradmin_down:
  126. Logger.error("Connection to Ranger Admin failed !")
  127. @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
  128. def create_repository_urllib2(self, data, usernamepassword, policy_user):
  129. """
  130. :param data: repository dict
  131. :param usernamepassword: user credentials using which repository needs to be created
  132. :param policy_user: use this policy user for policies that will be used during repository creation
  133. :return Returns created repository response else None
  134. """
  135. try:
  136. searchRepoURL = self.urlReposPub
  137. base64string = base64.encodestring('{0}'.format(usernamepassword)).replace('\n', '')
  138. headers = {
  139. 'Accept': 'application/json',
  140. "Content-Type": "application/json"
  141. }
  142. request = urllib2.Request(searchRepoURL, data, headers)
  143. request.add_header("Authorization", "Basic {0}".format(base64string))
  144. result = openurl(request, timeout=20)
  145. response_code = result.getcode()
  146. response = json.loads(json.JSONEncoder().encode(result.read()))
  147. if response_code == 200:
  148. Logger.info('Repository created Successfully')
  149. # Get Policies
  150. repoData = json.loads(data)
  151. repoName = repoData['name']
  152. typeOfPolicy = repoData['repositoryType']
  153. # Get Policies by repo name
  154. policyList = self.get_policy_by_repo_name(name=repoName, component=typeOfPolicy, status="true",
  155. usernamepassword=usernamepassword)
  156. if policyList is not None and (len(policyList)) > 0:
  157. policiesUpdateCount = 0
  158. for policy in policyList:
  159. updatedPolicyObj = self.get_policy_params(typeOfPolicy, policy, policy_user)
  160. policyResCode, policyResponse = self.update_ranger_policy(updatedPolicyObj['id'],
  161. json.dumps(updatedPolicyObj), usernamepassword)
  162. if policyResCode == 200:
  163. policiesUpdateCount = policiesUpdateCount + 1
  164. else:
  165. Logger.info('Policy Update failed')
  166. # Check for count of updated policies
  167. if len(policyList) == policiesUpdateCount:
  168. Logger.info(
  169. "Ranger Repository created successfully and policies updated successfully providing ambari-qa user all permissions")
  170. return response
  171. else:
  172. return None
  173. else:
  174. Logger.info("Policies not found for the newly created Repository")
  175. return None
  176. else:
  177. Logger.info('Repository creation failed')
  178. return None
  179. except urllib2.URLError, e:
  180. if isinstance(e, urllib2.HTTPError):
  181. raise Fail("Error creating repository. Http status code - {0}. \n {1}".format(e.code, e.read()))
  182. else:
  183. raise Fail("Error creating repository. Reason - {0}.".format(e.reason))
  184. except httplib.BadStatusLine:
  185. raise Fail("Ranger Admin service is not reachable, please restart the service and then try again")
  186. except TimeoutError:
  187. raise Fail("Connection to Ranger Admin failed. Reason - timeout")
  188. @safe_retry(times=75, sleep_time=8, backoff_factor=1, err_class=Fail, return_on_fail=None)
  189. def check_ranger_login_urllib2(self, url):
  190. """
  191. :param url: ranger admin host url
  192. :return Returns login check response
  193. """
  194. try:
  195. response = openurl(url, timeout=20)
  196. response_code = response.getcode()
  197. return response_code
  198. except urllib2.URLError, e:
  199. if isinstance(e, urllib2.HTTPError):
  200. raise Fail("Connection to Ranger Admin failed. Http status code - {0}. \n {1}".format(e.code, e.read()))
  201. else:
  202. raise Fail("Connection to Ranger Admin failed. Reason - {0}.".format(e.reason))
  203. except httplib.BadStatusLine, e:
  204. raise Fail("Ranger Admin service is not reachable, please restart the service and then try again")
  205. except TimeoutError:
  206. raise Fail("Connection to Ranger Admin failed. Reason - timeout")
  207. @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
  208. def get_policy_by_repo_name(self, name, component, status, usernamepassword):
  209. """
  210. :param name: repository name
  211. :param component: component name for which policy needs to be searched
  212. :param status: true or false
  213. :param usernamepassword: user credentials using which policy needs to be searched
  214. :return Returns successful response else None
  215. """
  216. try:
  217. searchPolicyURL = self.urlPolicies + "?repositoryName=" + name + "&repositoryType=" + component + "&isEnabled=" + status
  218. request = urllib2.Request(searchPolicyURL)
  219. base64string = base64.encodestring(usernamepassword).replace('\n', '')
  220. request.add_header("Content-Type", "application/json")
  221. request.add_header("Accept", "application/json")
  222. request.add_header("Authorization", "Basic {0}".format(base64string))
  223. result = openurl(request, timeout=20)
  224. response_code = result.getcode()
  225. response = json.loads(result.read())
  226. if response_code == 200 and len(response['vXPolicies']) > 0:
  227. return response['vXPolicies']
  228. else:
  229. return None
  230. except urllib2.URLError, e:
  231. if isinstance(e, urllib2.HTTPError):
  232. Logger.error("Error getting policy from repository {0} for component {1}. Http status code - {2}. \n {3}".format(name, component, e.code, e.read()))
  233. else:
  234. Logger.error("Error getting policy from repository {0} for component {1}. Reason - {2}.".format(name, component, e.reason))
  235. return None
  236. except httplib.BadStatusLine:
  237. Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
  238. return None
  239. except TimeoutError:
  240. raise Fail("Connection to Ranger Admin failed. Reason - timeout")
  241. @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
  242. def update_ranger_policy(self, policyId, data, usernamepassword):
  243. """
  244. :param policyId: policy id which needs to be updated
  245. :param data: policy data that needs to be updated
  246. :param usernamepassword: user credentials using which policy needs to be updated
  247. :return Returns successful response and response code else None
  248. """
  249. try:
  250. searchRepoURL = self.urlPolicies + "/" + str(policyId)
  251. base64string = base64.encodestring('{0}'.format(usernamepassword)).replace('\n', '')
  252. headers = {
  253. 'Accept': 'application/json',
  254. "Content-Type": "application/json"
  255. }
  256. request = urllib2.Request(searchRepoURL, data, headers)
  257. request.add_header("Authorization", "Basic {0}".format(base64string))
  258. request.get_method = lambda: 'PUT'
  259. result = openurl(request, timeout=20)
  260. response_code = result.getcode()
  261. response = json.loads(json.JSONEncoder().encode(result.read()))
  262. if response_code == 200:
  263. Logger.info('Policy updated Successfully')
  264. return response_code, response
  265. else:
  266. Logger.error('Update Policy failed')
  267. return None, None
  268. except urllib2.URLError, e:
  269. if isinstance(e, urllib2.HTTPError):
  270. Logger.error("Error updating policy. Http status code - {0}. \n {1}".format(e.code, e.read()))
  271. else:
  272. Logger.error("Error updating policy. Reason - {0}.".format(e.reason))
  273. return None, None
  274. except httplib.BadStatusLine:
  275. Logger.error("Ranger Admin service is not reachable, please restart the service and then try again")
  276. return None, None
  277. except TimeoutError:
  278. raise Fail("Connection to Ranger Admin failed. Reason - timeout")
  279. def get_policy_params(self, typeOfPolicy, policyObj, policy_user):
  280. """
  281. :param typeOfPolicy: component name for which policy has to be get
  282. :param policyObj: policy dict
  283. :param policy_user: policy user that needs to be updated
  284. :returns Returns updated policy dict
  285. """
  286. typeOfPolicy = typeOfPolicy.lower()
  287. if typeOfPolicy == "hdfs":
  288. policyObj['permMapList'] = [{'userList': [policy_user], 'permList': ['read', 'write', 'execute', 'admin']}]
  289. elif typeOfPolicy == "hive":
  290. policyObj['permMapList'] = [{'userList': [policy_user],
  291. 'permList': ['select', 'update', 'create', 'drop', 'alter', 'index', 'lock', 'all',
  292. 'admin']}]
  293. elif typeOfPolicy == "hbase":
  294. policyObj['permMapList'] = [{'userList': [policy_user], 'permList': ['read', 'write', 'create', 'admin']}]
  295. elif typeOfPolicy == "knox":
  296. policyObj['permMapList'] = [{'userList': [policy_user], 'permList': ['allow', 'admin']}]
  297. elif typeOfPolicy == "storm":
  298. policyObj['permMapList'] = [{'userList': [policy_user],
  299. 'permList': ['submitTopology', 'fileUpload', 'getNimbusConf', 'getClusterInfo',
  300. 'fileDownload', 'killTopology', 'rebalance', 'activate', 'deactivate',
  301. 'getTopologyConf', 'getTopology', 'getUserTopology',
  302. 'getTopologyInfo', 'uploadNewCredential', 'admin']}]
  303. return policyObj
  304. @safe_retry(times=5, sleep_time=8, backoff_factor=1.5, err_class=Fail, return_on_fail=None)
  305. def create_ambari_admin_user(self,ambari_admin_username, ambari_admin_password,usernamepassword):
  306. """
  307. :param ambari_admin_username: username of user to be created
  308. :param ambari_admin_username: user password of user to be created
  309. :return Returns response code for successful user creation else None
  310. """
  311. flag_ambari_admin_present = False
  312. match = re.match('[a-zA-Z0-9_\S]+$', ambari_admin_password)
  313. if match is None:
  314. raise Fail('Invalid password given for Ranger Admin user for Ambari')
  315. try:
  316. url = self.urlUsers + '?name=' + str(ambari_admin_username)
  317. request = urllib2.Request(url)
  318. base64string = base64.encodestring(usernamepassword).replace('\n', '')
  319. request.add_header("Content-Type", "application/json")
  320. request.add_header("Accept", "application/json")
  321. request.add_header("Authorization", "Basic {0}".format(base64string))
  322. result = openurl(request, timeout=20)
  323. response_code = result.getcode()
  324. response = json.loads(result.read())
  325. if response_code == 200 and len(response['vXUsers']) >= 0:
  326. for vxuser in response['vXUsers']:
  327. if vxuser['name'] == ambari_admin_username:
  328. flag_ambari_admin_present = True
  329. break
  330. else:
  331. flag_ambari_admin_present = False
  332. if flag_ambari_admin_present:
  333. Logger.info(ambari_admin_username + ' user already exists.')
  334. return response_code
  335. else:
  336. Logger.info(ambari_admin_username + ' user is not present, creating user using given configurations')
  337. url = self.urlSecUsers
  338. admin_user = dict()
  339. admin_user['status'] = 1
  340. admin_user['userRoleList'] = ['ROLE_SYS_ADMIN']
  341. admin_user['name'] = ambari_admin_username
  342. admin_user['password'] = ambari_admin_password
  343. admin_user['description'] = ambari_admin_username
  344. admin_user['firstName'] = ambari_admin_username
  345. data = json.dumps(admin_user)
  346. base64string = base64.encodestring('{0}'.format(usernamepassword)).replace('\n', '')
  347. headers = {
  348. 'Accept': 'application/json',
  349. "Content-Type": "application/json"
  350. }
  351. request = urllib2.Request(url, data, headers)
  352. request.add_header("Authorization", "Basic {0}".format(base64string))
  353. result = openurl(request, timeout=20)
  354. response_code = result.getcode()
  355. response = json.loads(json.JSONEncoder().encode(result.read()))
  356. if response_code == 200 and response is not None:
  357. Logger.info('Ambari admin user creation successful.')
  358. return response_code
  359. else:
  360. Logger.info('Ambari admin user creation failed.')
  361. return None
  362. else:
  363. return None
  364. except urllib2.URLError, e:
  365. if isinstance(e, urllib2.HTTPError):
  366. raise Fail("Error creating ambari admin user. Http status code - {0}. \n {1}".format(e.code, e.read()))
  367. else:
  368. raise Fail("Error creating ambari admin user. Reason - {0}.".format(e.reason))
  369. except httplib.BadStatusLine:
  370. raise Fail("Ranger Admin service is not reachable, please restart the service and then try again")
  371. except TimeoutError:
  372. raise Fail("Connection to Ranger Admin failed. Reason - timeout")