TestCustomServiceOrchestrator.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import ConfigParser
  18. from multiprocessing.pool import ThreadPool
  19. import os
  20. import pprint
  21. import shell
  22. from unittest import TestCase
  23. import threading
  24. import tempfile
  25. import time
  26. from threading import Thread
  27. from PythonExecutor import PythonExecutor
  28. from CustomServiceOrchestrator import CustomServiceOrchestrator
  29. from AmbariConfig import AmbariConfig
  30. from mock.mock import MagicMock, patch
  31. import StringIO
  32. import sys
  33. from AgentException import AgentException
  34. from FileCache import FileCache
  35. from LiveStatus import LiveStatus
  36. class TestCustomServiceOrchestrator(TestCase):
  37. def setUp(self):
  38. # disable stdout
  39. out = StringIO.StringIO()
  40. sys.stdout = out
  41. # generate sample config
  42. tmpdir = tempfile.gettempdir()
  43. exec_tmp_dir = os.path.join(tmpdir, 'tmp')
  44. self.config = ConfigParser.RawConfigParser()
  45. self.config.add_section('agent')
  46. self.config.set('agent', 'prefix', tmpdir)
  47. self.config.set('agent', 'tmp_dir', exec_tmp_dir)
  48. self.config.set('agent', 'cache_dir', "/cachedir")
  49. self.config.add_section('python')
  50. self.config.set('python', 'custom_actions_dir', tmpdir)
  51. @patch.object(FileCache, "__init__")
  52. def test_add_reg_listener_to_controller(self, FileCache_mock):
  53. FileCache_mock.return_value = None
  54. dummy_controller = MagicMock()
  55. config = AmbariConfig().getConfig()
  56. tempdir = tempfile.gettempdir()
  57. config.set('agent', 'prefix', tempdir)
  58. CustomServiceOrchestrator(config, dummy_controller)
  59. self.assertTrue(dummy_controller.registration_listeners.append.called)
  60. @patch.object(CustomServiceOrchestrator, 'decompressClusterHostInfo')
  61. @patch("hostname.public_hostname")
  62. @patch("os.path.isfile")
  63. @patch("os.unlink")
  64. @patch.object(FileCache, "__init__")
  65. def test_dump_command_to_json(self, FileCache_mock, unlink_mock,
  66. isfile_mock, hostname_mock,
  67. decompress_cluster_host_info_mock):
  68. FileCache_mock.return_value = None
  69. hostname_mock.return_value = "test.hst"
  70. command = {
  71. 'commandType': 'EXECUTION_COMMAND',
  72. 'role': u'DATANODE',
  73. 'roleCommand': u'INSTALL',
  74. 'commandId': '1-1',
  75. 'taskId': 3,
  76. 'clusterName': u'cc',
  77. 'serviceName': u'HDFS',
  78. 'configurations':{'global' : {}},
  79. 'configurationTags':{'global' : { 'tag': 'v1' }},
  80. 'clusterHostInfo':{'namenode_host' : ['1'],
  81. 'slave_hosts' : ['0', '1'],
  82. 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  83. 'all_ping_ports': ['8670:0,1']}
  84. }
  85. decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'],
  86. 'slave_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  87. 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  88. 'all_ping_ports': ['8670', '8670']}
  89. config = AmbariConfig().getConfig()
  90. tempdir = tempfile.gettempdir()
  91. config.set('agent', 'prefix', tempdir)
  92. dummy_controller = MagicMock()
  93. orchestrator = CustomServiceOrchestrator(config, dummy_controller)
  94. isfile_mock.return_value = True
  95. # Test dumping EXECUTION_COMMAND
  96. json_file = orchestrator.dump_command_to_json(command)
  97. self.assertTrue(os.path.exists(json_file))
  98. self.assertTrue(os.path.getsize(json_file) > 0)
  99. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  100. self.assertTrue(json_file.endswith("command-3.json"))
  101. self.assertTrue(decompress_cluster_host_info_mock.called)
  102. os.unlink(json_file)
  103. # Test dumping STATUS_COMMAND
  104. command['commandType']='STATUS_COMMAND'
  105. decompress_cluster_host_info_mock.reset_mock()
  106. json_file = orchestrator.dump_command_to_json(command)
  107. self.assertTrue(os.path.exists(json_file))
  108. self.assertTrue(os.path.getsize(json_file) > 0)
  109. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  110. self.assertTrue(json_file.endswith("status_command.json"))
  111. self.assertFalse(decompress_cluster_host_info_mock.called)
  112. os.unlink(json_file)
  113. # Testing side effect of dump_command_to_json
  114. self.assertEquals(command['public_hostname'], "test.hst")
  115. self.assertTrue(unlink_mock.called)
  116. @patch("os.path.exists")
  117. @patch.object(FileCache, "__init__")
  118. def test_resolve_script_path(self, FileCache_mock, exists_mock):
  119. FileCache_mock.return_value = None
  120. dummy_controller = MagicMock()
  121. config = AmbariConfig().getConfig()
  122. orchestrator = CustomServiceOrchestrator(config, dummy_controller)
  123. # Testing existing path
  124. exists_mock.return_value = True
  125. path = orchestrator.\
  126. resolve_script_path("/HBASE/package", "scripts/hbase_master.py", "PYTHON")
  127. self.assertEqual("/HBASE/package/scripts/hbase_master.py", path)
  128. # Testing not existing path
  129. exists_mock.return_value = False
  130. try:
  131. orchestrator.resolve_script_path("/HBASE",
  132. "scripts/hbase_master.py", "PYTHON")
  133. self.fail('ExpectedException not thrown')
  134. except AgentException:
  135. pass # Expected
  136. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  137. @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
  138. @patch.object(FileCache, "get_service_base_dir")
  139. @patch.object(FileCache, "get_hook_base_dir")
  140. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  141. @patch.object(PythonExecutor, "run_file")
  142. @patch.object(FileCache, "__init__")
  143. def test_runCommand(self, FileCache_mock,
  144. run_file_mock, dump_command_to_json_mock,
  145. get_hook_base_dir_mock, get_service_base_dir_mock,
  146. resolve_hook_script_path_mock, resolve_script_path_mock):
  147. FileCache_mock.return_value = None
  148. command = {
  149. 'role' : 'REGION_SERVER',
  150. 'hostLevelParams' : {
  151. 'stack_name' : 'HDP',
  152. 'stack_version' : '2.0.7',
  153. 'jdk_location' : 'some_location'
  154. },
  155. 'commandParams': {
  156. 'script_type': 'PYTHON',
  157. 'script': 'scripts/hbase_regionserver.py',
  158. 'command_timeout': '600',
  159. 'service_package_folder' : 'HBASE'
  160. },
  161. 'taskId' : '3',
  162. 'roleCommand': 'INSTALL'
  163. }
  164. get_service_base_dir_mock.return_value = "/basedir/"
  165. resolve_script_path_mock.return_value = "/basedir/scriptpath"
  166. resolve_hook_script_path_mock.return_value = \
  167. ('/hooks_dir/prefix-command/scripts/hook.py',
  168. '/hooks_dir/prefix-command')
  169. dummy_controller = MagicMock()
  170. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  171. unix_process_id = 111
  172. orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
  173. get_hook_base_dir_mock.return_value = "/hooks/"
  174. # normal run case
  175. run_file_mock.return_value = {
  176. 'stdout' : 'sss',
  177. 'stderr' : 'eee',
  178. 'exitcode': 0,
  179. }
  180. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  181. self.assertEqual(ret['exitcode'], 0)
  182. self.assertTrue(run_file_mock.called)
  183. self.assertEqual(run_file_mock.call_count, 3)
  184. run_file_mock.reset_mock()
  185. # Case when we force another command
  186. run_file_mock.return_value = {
  187. 'stdout' : 'sss',
  188. 'stderr' : 'eee',
  189. 'exitcode': 0,
  190. }
  191. ret = orchestrator.runCommand(command, "out.txt", "err.txt",
  192. forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
  193. ## Check that override_output_files was true only during first call
  194. self.assertEquals(run_file_mock.call_args_list[0][0][10], True)
  195. self.assertEquals(run_file_mock.call_args_list[1][0][10], False)
  196. self.assertEquals(run_file_mock.call_args_list[2][0][10], False)
  197. ## Check that forced_command_name was taken into account
  198. self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
  199. CustomServiceOrchestrator.COMMAND_NAME_STATUS)
  200. run_file_mock.reset_mock()
  201. # unknown script type case
  202. command['commandParams']['script_type'] = "SOME_TYPE"
  203. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  204. self.assertEqual(ret['exitcode'], 1)
  205. self.assertFalse(run_file_mock.called)
  206. self.assertTrue("Unknown script type" in ret['stdout'])
  207. #By default returns empty dictionary
  208. self.assertEqual(ret['structuredOut'], '{}')
  209. pass
  210. @patch("shell.kill_process_with_children")
  211. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  212. @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
  213. @patch.object(FileCache, "get_service_base_dir")
  214. @patch.object(FileCache, "get_hook_base_dir")
  215. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  216. @patch.object(PythonExecutor, "run_file")
  217. @patch.object(FileCache, "__init__")
  218. def test_cancel_command(self, FileCache_mock,
  219. run_file_mock, dump_command_to_json_mock,
  220. get_hook_base_dir_mock, get_service_base_dir_mock,
  221. resolve_hook_script_path_mock, resolve_script_path_mock,
  222. kill_process_with_children_mock):
  223. FileCache_mock.return_value = None
  224. command = {
  225. 'role' : 'REGION_SERVER',
  226. 'hostLevelParams' : {
  227. 'stack_name' : 'HDP',
  228. 'stack_version' : '2.0.7',
  229. 'jdk_location' : 'some_location'
  230. },
  231. 'commandParams': {
  232. 'script_type': 'PYTHON',
  233. 'script': 'scripts/hbase_regionserver.py',
  234. 'command_timeout': '600',
  235. 'service_package_folder' : 'HBASE'
  236. },
  237. 'taskId' : '3',
  238. 'roleCommand': 'INSTALL'
  239. }
  240. get_service_base_dir_mock.return_value = "/basedir/"
  241. resolve_script_path_mock.return_value = "/basedir/scriptpath"
  242. resolve_hook_script_path_mock.return_value = \
  243. ('/hooks_dir/prefix-command/scripts/hook.py',
  244. '/hooks_dir/prefix-command')
  245. dummy_controller = MagicMock()
  246. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  247. unix_process_id = 111
  248. orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
  249. get_hook_base_dir_mock.return_value = "/hooks/"
  250. run_file_mock_return_value = {
  251. 'stdout' : 'killed',
  252. 'stderr' : 'killed',
  253. 'exitcode': 1,
  254. }
  255. def side_effect(*args, **kwargs):
  256. time.sleep(0.2)
  257. return run_file_mock_return_value
  258. run_file_mock.side_effect = side_effect
  259. _, out = tempfile.mkstemp()
  260. _, err = tempfile.mkstemp()
  261. pool = ThreadPool(processes=1)
  262. async_result = pool.apply_async(orchestrator.runCommand, (command, out, err))
  263. time.sleep(0.1)
  264. orchestrator.cancel_command(command['taskId'], 'reason')
  265. ret = async_result.get()
  266. self.assertEqual(ret['exitcode'], 1)
  267. self.assertEquals(ret['stdout'], 'killed\nCommand aborted. reason')
  268. self.assertEquals(ret['stderr'], 'killed\nCommand aborted. reason')
  269. self.assertTrue(kill_process_with_children_mock.called)
  270. self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys())
  271. self.assertTrue(os.path.exists(out))
  272. self.assertTrue(os.path.exists(err))
  273. os.remove(out)
  274. os.remove(err)
  275. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  276. @patch.object(PythonExecutor, "run_file")
  277. @patch.object(FileCache, "__init__")
  278. @patch.object(FileCache, "get_custom_actions_base_dir")
  279. def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock,
  280. FileCache_mock,
  281. run_file_mock, dump_command_to_json_mock):
  282. FileCache_mock.return_value = None
  283. get_custom_actions_base_dir_mock.return_value = "some path"
  284. _, script = tempfile.mkstemp()
  285. command = {
  286. 'role' : 'any',
  287. 'commandParams': {
  288. 'script_type': 'PYTHON',
  289. 'script': 'some_custom_action.py',
  290. 'command_timeout': '600',
  291. 'jdk_location' : 'some_location'
  292. },
  293. 'taskId' : '3',
  294. 'roleCommand': 'ACTIONEXECUTE'
  295. }
  296. dummy_controller = MagicMock()
  297. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  298. unix_process_id = 111
  299. orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
  300. # normal run case
  301. run_file_mock.return_value = {
  302. 'stdout' : 'sss',
  303. 'stderr' : 'eee',
  304. 'exitcode': 0,
  305. }
  306. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  307. self.assertEqual(ret['exitcode'], 0)
  308. self.assertTrue(run_file_mock.called)
  309. # Hoooks are not supported for custom actions,
  310. # that's why run_file() should be called only once
  311. self.assertEqual(run_file_mock.call_count, 1)
  312. @patch("os.path.isfile")
  313. @patch.object(FileCache, "__init__")
  314. def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock):
  315. FileCache_mock.return_value = None
  316. dummy_controller = MagicMock()
  317. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  318. # Testing None param
  319. res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command",
  320. "script_type")
  321. self.assertEqual(res1, None)
  322. # Testing existing hook script
  323. isfile_mock.return_value = True
  324. res2 = orchestrator.resolve_hook_script_path("/hooks_dir/", "prefix", "command",
  325. "script_type")
  326. self.assertEqual(res2, ('/hooks_dir/prefix-command/scripts/hook.py',
  327. '/hooks_dir/prefix-command'))
  328. # Testing not existing hook script
  329. isfile_mock.return_value = False
  330. res3 = orchestrator.resolve_hook_script_path("/hooks_dir/", "prefix", "command",
  331. "script_type")
  332. self.assertEqual(res3, None)
  333. @patch.object(CustomServiceOrchestrator, "runCommand")
  334. @patch.object(FileCache, "__init__")
  335. def test_requestComponentStatus(self, FileCache_mock, runCommand_mock):
  336. FileCache_mock.return_value = None
  337. status_command = {
  338. "serviceName" : 'HDFS',
  339. "commandType" : "STATUS_COMMAND",
  340. "clusterName" : "",
  341. "componentName" : "DATANODE",
  342. 'configurations':{}
  343. }
  344. dummy_controller = MagicMock()
  345. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  346. # Test alive case
  347. runCommand_mock.return_value = {
  348. "exitcode" : 0
  349. }
  350. status = orchestrator.requestComponentStatus(status_command)
  351. self.assertEqual(runCommand_mock.return_value, status)
  352. # Test dead case
  353. runCommand_mock.return_value = {
  354. "exitcode" : 1
  355. }
  356. status = orchestrator.requestComponentStatus(status_command)
  357. self.assertEqual(runCommand_mock.return_value, status)
  358. def tearDown(self):
  359. # enable stdout
  360. sys.stdout = sys.__stdout__