TestCustomServiceOrchestrator.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import ConfigParser
  18. from multiprocessing.pool import ThreadPool
  19. import os
  20. import pprint
  21. from ambari_commons import shell
  22. from unittest import TestCase
  23. import threading
  24. import tempfile
  25. import time
  26. from threading import Thread
  27. from mock.mock import MagicMock, patch
  28. import StringIO
  29. import sys
  30. from ambari_agent.ActionQueue import ActionQueue
  31. from ambari_agent.AgentException import AgentException
  32. from ambari_agent.AmbariConfig import AmbariConfig
  33. from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
  34. from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
  35. from ambari_agent.FileCache import FileCache
  36. from ambari_agent.PythonExecutor import PythonExecutor
  37. from ambari_commons import OSCheck
  38. from only_for_platform import get_platform, os_distro_value, PLATFORM_WINDOWS
  39. class TestCustomServiceOrchestrator(TestCase):
  40. def setUp(self):
  41. # disable stdout
  42. out = StringIO.StringIO()
  43. sys.stdout = out
  44. # generate sample config
  45. tmpdir = tempfile.gettempdir()
  46. exec_tmp_dir = os.path.join(tmpdir, 'tmp')
  47. self.config = ConfigParser.RawConfigParser()
  48. self.config.get = AmbariConfig().get
  49. self.config.add_section('agent')
  50. self.config.set('agent', 'prefix', tmpdir)
  51. self.config.set('agent', 'cache_dir', "/cachedir")
  52. self.config.add_section('python')
  53. self.config.set('python', 'custom_actions_dir', tmpdir)
  54. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  55. @patch.object(FileCache, "__init__")
  56. def test_add_reg_listener_to_controller(self, FileCache_mock):
  57. FileCache_mock.return_value = None
  58. dummy_controller = MagicMock()
  59. config = AmbariConfig().getConfig()
  60. tempdir = tempfile.gettempdir()
  61. config.set('agent', 'prefix', tempdir)
  62. CustomServiceOrchestrator(config, dummy_controller)
  63. self.assertTrue(dummy_controller.registration_listeners.append.called)
  64. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  65. @patch.object(CustomServiceOrchestrator, 'decompressClusterHostInfo')
  66. @patch("ambari_agent.hostname.public_hostname")
  67. @patch("os.path.isfile")
  68. @patch("os.unlink")
  69. @patch.object(FileCache, "__init__")
  70. def test_dump_command_to_json(self, FileCache_mock, unlink_mock,
  71. isfile_mock, hostname_mock,
  72. decompress_cluster_host_info_mock):
  73. FileCache_mock.return_value = None
  74. hostname_mock.return_value = "test.hst"
  75. command = {
  76. 'commandType': 'EXECUTION_COMMAND',
  77. 'role': u'DATANODE',
  78. 'roleCommand': u'INSTALL',
  79. 'commandId': '1-1',
  80. 'taskId': 3,
  81. 'clusterName': u'cc',
  82. 'serviceName': u'HDFS',
  83. 'configurations':{'global' : {}},
  84. 'configurationTags':{'global' : { 'tag': 'v1' }},
  85. 'clusterHostInfo':{'namenode_host' : ['1'],
  86. 'slave_hosts' : ['0', '1'],
  87. 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  88. 'all_ping_ports': ['8670:0,1']},
  89. 'hostLevelParams':{}
  90. }
  91. decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'],
  92. 'slave_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  93. 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  94. 'all_ping_ports': ['8670', '8670']}
  95. config = AmbariConfig()
  96. tempdir = tempfile.gettempdir()
  97. config.set('agent', 'prefix', tempdir)
  98. dummy_controller = MagicMock()
  99. orchestrator = CustomServiceOrchestrator(config, dummy_controller)
  100. isfile_mock.return_value = True
  101. # Test dumping EXECUTION_COMMAND
  102. json_file = orchestrator.dump_command_to_json(command)
  103. self.assertTrue(os.path.exists(json_file))
  104. self.assertTrue(os.path.getsize(json_file) > 0)
  105. if get_platform() != PLATFORM_WINDOWS:
  106. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  107. self.assertTrue(json_file.endswith("command-3.json"))
  108. self.assertTrue(decompress_cluster_host_info_mock.called)
  109. os.unlink(json_file)
  110. # Test dumping STATUS_COMMAND
  111. command['commandType']='STATUS_COMMAND'
  112. decompress_cluster_host_info_mock.reset_mock()
  113. json_file = orchestrator.dump_command_to_json(command)
  114. self.assertTrue(os.path.exists(json_file))
  115. self.assertTrue(os.path.getsize(json_file) > 0)
  116. if get_platform() != PLATFORM_WINDOWS:
  117. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  118. self.assertTrue(json_file.endswith("status_command.json"))
  119. self.assertFalse(decompress_cluster_host_info_mock.called)
  120. os.unlink(json_file)
  121. # Testing side effect of dump_command_to_json
  122. self.assertEquals(command['public_hostname'], "test.hst")
  123. self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0)
  124. self.assertTrue(unlink_mock.called)
  125. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  126. @patch("ambari_agent.hostname.public_hostname")
  127. @patch("os.path.isfile")
  128. @patch("os.unlink")
  129. @patch.object(FileCache, "__init__")
  130. def test_dump_command_to_json_with_retry(self, FileCache_mock, unlink_mock,
  131. isfile_mock, hostname_mock):
  132. FileCache_mock.return_value = None
  133. hostname_mock.return_value = "test.hst"
  134. command = {
  135. 'commandType': 'EXECUTION_COMMAND',
  136. 'role': u'DATANODE',
  137. 'roleCommand': u'INSTALL',
  138. 'commandId': '1-1',
  139. 'taskId': 3,
  140. 'clusterName': u'cc',
  141. 'serviceName': u'HDFS',
  142. 'configurations':{'global' : {}},
  143. 'configurationTags':{'global' : { 'tag': 'v1' }},
  144. 'clusterHostInfo':{'namenode_host' : ['1'],
  145. 'slave_hosts' : ['0', '1'],
  146. 'all_racks' : [u'/default-rack:0'],
  147. 'ambari_server_host' : 'a.b.c',
  148. 'ambari_server_port' : '123',
  149. 'ambari_server_use_ssl' : 'false',
  150. 'all_ipv4_ips' : [u'192.168.12.101:0'],
  151. 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  152. 'all_ping_ports': ['8670:0,1']},
  153. 'hostLevelParams':{}
  154. }
  155. config = AmbariConfig()
  156. tempdir = tempfile.gettempdir()
  157. config.set('agent', 'prefix', tempdir)
  158. dummy_controller = MagicMock()
  159. orchestrator = CustomServiceOrchestrator(config, dummy_controller)
  160. isfile_mock.return_value = True
  161. # Test dumping EXECUTION_COMMAND
  162. json_file = orchestrator.dump_command_to_json(command)
  163. self.assertTrue(os.path.exists(json_file))
  164. self.assertTrue(os.path.getsize(json_file) > 0)
  165. if get_platform() != PLATFORM_WINDOWS:
  166. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  167. self.assertTrue(json_file.endswith("command-3.json"))
  168. os.unlink(json_file)
  169. # Test dumping STATUS_COMMAND
  170. json_file = orchestrator.dump_command_to_json(command, True)
  171. self.assertTrue(os.path.exists(json_file))
  172. self.assertTrue(os.path.getsize(json_file) > 0)
  173. if get_platform() != PLATFORM_WINDOWS:
  174. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  175. self.assertTrue(json_file.endswith("command-3.json"))
  176. os.unlink(json_file)
  177. # Testing side effect of dump_command_to_json
  178. self.assertEquals(command['public_hostname'], "test.hst")
  179. self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0)
  180. self.assertTrue(unlink_mock.called)
  181. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  182. @patch("os.path.exists")
  183. @patch.object(FileCache, "__init__")
  184. def test_resolve_script_path(self, FileCache_mock, exists_mock):
  185. FileCache_mock.return_value = None
  186. dummy_controller = MagicMock()
  187. config = AmbariConfig().getConfig()
  188. orchestrator = CustomServiceOrchestrator(config, dummy_controller)
  189. # Testing existing path
  190. exists_mock.return_value = True
  191. path = orchestrator.\
  192. resolve_script_path(os.path.join("HBASE", "package"), os.path.join("scripts", "hbase_master.py"))
  193. self.assertEqual(os.path.join("HBASE", "package", "scripts", "hbase_master.py"), path)
  194. # Testing not existing path
  195. exists_mock.return_value = False
  196. try:
  197. orchestrator.resolve_script_path("/HBASE",
  198. os.path.join("scripts", "hbase_master.py"))
  199. self.fail('ExpectedException not thrown')
  200. except AgentException:
  201. pass # Expected
  202. @patch.object(FileCache, "get_dashboard_base_dir")
  203. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  204. @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
  205. @patch.object(FileCache, "get_host_scripts_base_dir")
  206. @patch.object(FileCache, "get_service_base_dir")
  207. @patch.object(FileCache, "get_hook_base_dir")
  208. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  209. @patch.object(PythonExecutor, "run_file")
  210. @patch.object(FileCache, "__init__")
  211. def test_runCommand(self, FileCache_mock,
  212. run_file_mock, dump_command_to_json_mock,
  213. get_hook_base_dir_mock, get_service_base_dir_mock,
  214. get_host_scripts_base_dir_mock,
  215. resolve_hook_script_path_mock,
  216. resolve_script_path_mock,
  217. get_dashboard_base_dir_mock):
  218. FileCache_mock.return_value = None
  219. command = {
  220. 'commandType' : 'EXECUTION_COMMAND',
  221. 'role' : 'REGION_SERVER',
  222. 'hostLevelParams' : {
  223. 'stack_name' : 'HDP',
  224. 'stack_version' : '2.0.7',
  225. 'jdk_location' : 'some_location'
  226. },
  227. 'commandParams': {
  228. 'script_type': 'PYTHON',
  229. 'script': 'scripts/hbase_regionserver.py',
  230. 'command_timeout': '600',
  231. 'service_package_folder' : 'HBASE'
  232. },
  233. 'taskId' : '3',
  234. 'roleCommand': 'INSTALL'
  235. }
  236. get_host_scripts_base_dir_mock.return_value = "/host_scripts"
  237. get_service_base_dir_mock.return_value = "/basedir/"
  238. resolve_script_path_mock.return_value = "/basedir/scriptpath"
  239. resolve_hook_script_path_mock.return_value = \
  240. ('/hooks_dir/prefix-command/scripts/hook.py',
  241. '/hooks_dir/prefix-command')
  242. dummy_controller = MagicMock()
  243. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  244. unix_process_id = 111
  245. orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
  246. get_hook_base_dir_mock.return_value = "/hooks/"
  247. get_dashboard_base_dir_mock.return_value = "/dashboards/"
  248. # normal run case
  249. run_file_mock.return_value = {
  250. 'stdout' : 'sss',
  251. 'stderr' : 'eee',
  252. 'exitcode': 0,
  253. }
  254. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  255. self.assertEqual(ret['exitcode'], 0)
  256. self.assertTrue(run_file_mock.called)
  257. self.assertEqual(run_file_mock.call_count, 3)
  258. self.assertTrue(get_dashboard_base_dir_mock.called)
  259. run_file_mock.reset_mock()
  260. # Case when we force another command
  261. run_file_mock.return_value = {
  262. 'stdout' : 'sss',
  263. 'stderr' : 'eee',
  264. 'exitcode': 0,
  265. }
  266. ret = orchestrator.runCommand(command, "out.txt", "err.txt",
  267. forced_command_name=CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON)
  268. ## Check that override_output_files was true only during first call
  269. print run_file_mock
  270. self.assertEquals(run_file_mock.call_args_list[0][0][8], True)
  271. self.assertEquals(run_file_mock.call_args_list[1][0][8], False)
  272. self.assertEquals(run_file_mock.call_args_list[2][0][8], False)
  273. ## Check that forced_command_name was taken into account
  274. self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
  275. CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON)
  276. run_file_mock.reset_mock()
  277. # For role=METRICS_GRAFANA, dashboards should be sync'd
  278. command['role'] = 'METRICS_GRAFANA'
  279. get_dashboard_base_dir_mock.reset_mock()
  280. get_dashboard_base_dir_mock.return_value = "/dashboards/"
  281. run_file_mock.return_value = {
  282. 'stdout' : 'sss',
  283. 'stderr' : 'eee',
  284. 'exitcode': 0,
  285. }
  286. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  287. self.assertEqual(ret['exitcode'], 0)
  288. self.assertTrue(run_file_mock.called)
  289. self.assertEqual(run_file_mock.call_count, 3)
  290. self.assertTrue(get_dashboard_base_dir_mock.called)
  291. command['role'] = 'REGION_SERVER'
  292. run_file_mock.reset_mock()
  293. # unknown script type case
  294. command['commandParams']['script_type'] = "SOME_TYPE"
  295. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  296. self.assertEqual(ret['exitcode'], 1)
  297. self.assertFalse(run_file_mock.called)
  298. self.assertTrue("Unknown script type" in ret['stdout'])
  299. #By default returns empty dictionary
  300. self.assertEqual(ret['structuredOut'], '{}')
  301. pass
  302. @patch.object(FileCache, "get_dashboard_base_dir")
  303. @patch("ambari_commons.shell.kill_process_with_children")
  304. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  305. @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
  306. @patch.object(FileCache, "get_host_scripts_base_dir")
  307. @patch.object(FileCache, "get_service_base_dir")
  308. @patch.object(FileCache, "get_hook_base_dir")
  309. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  310. @patch.object(PythonExecutor, "run_file")
  311. @patch.object(FileCache, "__init__")
  312. def test_cancel_command(self, FileCache_mock,
  313. run_file_mock, dump_command_to_json_mock,
  314. get_hook_base_dir_mock, get_service_base_dir_mock,
  315. get_host_scripts_base_dir_mock,
  316. resolve_hook_script_path_mock, resolve_script_path_mock,
  317. kill_process_with_children_mock,
  318. get_dashboard_base_dir_mock):
  319. FileCache_mock.return_value = None
  320. command = {
  321. 'role' : 'REGION_SERVER',
  322. 'hostLevelParams' : {
  323. 'stack_name' : 'HDP',
  324. 'stack_version' : '2.0.7',
  325. 'jdk_location' : 'some_location'
  326. },
  327. 'commandParams': {
  328. 'script_type': 'PYTHON',
  329. 'script': 'scripts/hbase_regionserver.py',
  330. 'command_timeout': '600',
  331. 'service_package_folder' : 'HBASE'
  332. },
  333. 'taskId' : '3',
  334. 'roleCommand': 'INSTALL'
  335. }
  336. get_host_scripts_base_dir_mock.return_value = "/host_scripts"
  337. get_service_base_dir_mock.return_value = "/basedir/"
  338. resolve_script_path_mock.return_value = "/basedir/scriptpath"
  339. resolve_hook_script_path_mock.return_value = \
  340. ('/hooks_dir/prefix-command/scripts/hook.py',
  341. '/hooks_dir/prefix-command')
  342. dummy_controller = MagicMock()
  343. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  344. unix_process_id = 111
  345. orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
  346. get_hook_base_dir_mock.return_value = "/hooks/"
  347. get_dashboard_base_dir_mock.return_value = "/dashboards/"
  348. run_file_mock_return_value = {
  349. 'stdout' : 'killed',
  350. 'stderr' : 'killed',
  351. 'exitcode': 1,
  352. }
  353. def side_effect(*args, **kwargs):
  354. time.sleep(0.2)
  355. return run_file_mock_return_value
  356. run_file_mock.side_effect = side_effect
  357. _, out = tempfile.mkstemp()
  358. _, err = tempfile.mkstemp()
  359. pool = ThreadPool(processes=1)
  360. async_result = pool.apply_async(orchestrator.runCommand, (command, out, err))
  361. time.sleep(0.1)
  362. orchestrator.cancel_command(command['taskId'], 'reason')
  363. ret = async_result.get()
  364. self.assertEqual(ret['exitcode'], 1)
  365. self.assertEquals(ret['stdout'], 'killed\nCommand aborted. Reason: \'reason\'')
  366. self.assertEquals(ret['stderr'], 'killed\nCommand aborted. Reason: \'reason\'')
  367. self.assertTrue(kill_process_with_children_mock.called)
  368. self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys())
  369. self.assertTrue(os.path.exists(out))
  370. self.assertTrue(os.path.exists(err))
  371. try:
  372. os.remove(out)
  373. os.remove(err)
  374. except:
  375. pass
  376. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  377. @patch.object(CustomServiceOrchestrator, "get_py_executor")
  378. @patch("ambari_commons.shell.kill_process_with_children")
  379. @patch.object(FileCache, "__init__")
  380. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  381. @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
  382. def test_cancel_backgound_command(self, resolve_hook_script_path_mock,
  383. resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock,
  384. get_py_executor_mock):
  385. FileCache_mock.return_value = None
  386. FileCache_mock.cache_dir = MagicMock()
  387. resolve_hook_script_path_mock.return_value = None
  388. dummy_controller = MagicMock()
  389. cfg = AmbariConfig()
  390. cfg.set('agent', 'tolerate_download_failures', 'true')
  391. cfg.set('agent', 'prefix', '.')
  392. cfg.set('agent', 'cache_dir', 'background_tasks')
  393. actionQueue = ActionQueue(cfg, dummy_controller)
  394. dummy_controller.actionQueue = actionQueue
  395. orchestrator = CustomServiceOrchestrator(cfg, dummy_controller)
  396. orchestrator.file_cache = MagicMock()
  397. def f (a, b):
  398. return ""
  399. orchestrator.file_cache.get_service_base_dir = f
  400. actionQueue.customServiceOrchestrator = orchestrator
  401. import TestActionQueue
  402. import copy
  403. pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
  404. TestActionQueue.patch_output_file(pyex)
  405. pyex.prepare_process_result = MagicMock()
  406. get_py_executor_mock.return_value = pyex
  407. orchestrator.dump_command_to_json = MagicMock()
  408. lock = threading.RLock()
  409. complete_done = threading.Condition(lock)
  410. complete_was_called = {}
  411. def command_complete_w(process_condenced_result, handle):
  412. with lock:
  413. complete_was_called['visited']= ''
  414. complete_done.wait(3)
  415. actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None)
  416. execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command)
  417. actionQueue.put([execute_command])
  418. actionQueue.processBackgroundQueueSafeEmpty()
  419. time.sleep(.1)
  420. orchestrator.cancel_command(19,'reason')
  421. self.assertTrue(kill_process_with_children_mock.called)
  422. kill_process_with_children_mock.assert_called_with(33)
  423. with lock:
  424. complete_done.notifyAll()
  425. with lock:
  426. self.assertTrue(complete_was_called.has_key('visited'))
  427. time.sleep(.1)
  428. runningCommand = actionQueue.commandStatuses.get_command_status(19)
  429. self.assertTrue(runningCommand is not None)
  430. self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS)
  431. @patch.object(AmbariConfig, "get")
  432. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  433. @patch.object(PythonExecutor, "run_file")
  434. @patch.object(FileCache, "__init__")
  435. @patch.object(FileCache, "get_custom_actions_base_dir")
  436. def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock,
  437. FileCache_mock,
  438. run_file_mock, dump_command_to_json_mock, ambari_config_get):
  439. ambari_config_get.return_value = "0"
  440. FileCache_mock.return_value = None
  441. get_custom_actions_base_dir_mock.return_value = "some path"
  442. _, script = tempfile.mkstemp()
  443. command = {
  444. 'role' : 'any',
  445. 'commandParams': {
  446. 'script_type': 'PYTHON',
  447. 'script': 'some_custom_action.py',
  448. 'command_timeout': '600',
  449. 'jdk_location' : 'some_location'
  450. },
  451. 'taskId' : '3',
  452. 'roleCommand': 'ACTIONEXECUTE'
  453. }
  454. dummy_controller = MagicMock()
  455. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  456. unix_process_id = 111
  457. orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
  458. # normal run case
  459. run_file_mock.return_value = {
  460. 'stdout' : 'sss',
  461. 'stderr' : 'eee',
  462. 'exitcode': 0,
  463. }
  464. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  465. self.assertEqual(ret['exitcode'], 0)
  466. self.assertTrue(run_file_mock.called)
  467. # Hoooks are not supported for custom actions,
  468. # that's why run_file() should be called only once
  469. self.assertEqual(run_file_mock.call_count, 1)
  470. @patch("os.path.isfile")
  471. @patch.object(FileCache, "__init__")
  472. def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock):
  473. FileCache_mock.return_value = None
  474. dummy_controller = MagicMock()
  475. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  476. # Testing None param
  477. res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command",
  478. "script_type")
  479. self.assertEqual(res1, None)
  480. # Testing existing hook script
  481. isfile_mock.return_value = True
  482. res2 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command",
  483. "script_type")
  484. self.assertEqual(res2, (os.path.join('hooks_dir', 'prefix-command', 'scripts', 'hook.py'),
  485. os.path.join('hooks_dir', 'prefix-command')))
  486. # Testing not existing hook script
  487. isfile_mock.return_value = False
  488. res3 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command",
  489. "script_type")
  490. self.assertEqual(res3, None)
  491. @patch.object(CustomServiceOrchestrator, "runCommand")
  492. @patch.object(FileCache, "__init__")
  493. def test_requestComponentStatus(self, FileCache_mock, runCommand_mock):
  494. FileCache_mock.return_value = None
  495. status_command = {
  496. "serviceName" : 'HDFS',
  497. "commandType" : "STATUS_COMMAND",
  498. "clusterName" : "",
  499. "componentName" : "DATANODE",
  500. 'configurations':{}
  501. }
  502. dummy_controller = MagicMock()
  503. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  504. # Test alive case
  505. runCommand_mock.return_value = {
  506. "exitcode" : 0
  507. }
  508. status = orchestrator.requestComponentStatus(status_command)
  509. self.assertEqual(runCommand_mock.return_value, status)
  510. # Test dead case
  511. runCommand_mock.return_value = {
  512. "exitcode" : 1
  513. }
  514. status = orchestrator.requestComponentStatus(status_command)
  515. self.assertEqual(runCommand_mock.return_value, status)
  516. @patch.object(CustomServiceOrchestrator, "runCommand")
  517. @patch.object(FileCache, "__init__")
  518. def test_requestComponentSecurityState(self, FileCache_mock, runCommand_mock):
  519. FileCache_mock.return_value = None
  520. status_command = {
  521. "serviceName" : 'HDFS',
  522. "commandType" : "STATUS_COMMAND",
  523. "clusterName" : "",
  524. "componentName" : "DATANODE",
  525. 'configurations':{}
  526. }
  527. dummy_controller = MagicMock()
  528. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  529. # Test securityState
  530. runCommand_mock.return_value = {
  531. 'exitcode' : 0,
  532. 'structuredOut' : {'securityState': 'UNSECURED'}
  533. }
  534. status = orchestrator.requestComponentSecurityState(status_command)
  535. self.assertEqual('UNSECURED', status)
  536. # Test case where exit code indicates failure
  537. runCommand_mock.return_value = {
  538. "exitcode" : 1
  539. }
  540. status = orchestrator.requestComponentSecurityState(status_command)
  541. self.assertEqual('UNKNOWN', status)
  542. @patch.object(FileCache, "__init__")
  543. def test_requestComponentSecurityState_realFailure(self, FileCache_mock):
  544. '''
  545. Tests the case where the CustomServiceOrchestrator attempts to call a service's security_status
  546. method, but fails to do so because the script or method was not found.
  547. :param FileCache_mock:
  548. :return:
  549. '''
  550. FileCache_mock.return_value = None
  551. status_command = {
  552. "serviceName" : 'BOGUS_SERVICE',
  553. "commandType" : "STATUS_COMMAND",
  554. "clusterName" : "",
  555. "componentName" : "DATANODE",
  556. 'configurations':{}
  557. }
  558. dummy_controller = MagicMock()
  559. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  560. status = orchestrator.requestComponentSecurityState(status_command)
  561. self.assertEqual('UNKNOWN', status)
  562. @patch.object(CustomServiceOrchestrator, "get_py_executor")
  563. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  564. @patch.object(FileCache, "__init__")
  565. @patch.object(FileCache, "get_custom_actions_base_dir")
  566. def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
  567. FileCache_mock,
  568. dump_command_to_json_mock,
  569. get_py_executor_mock):
  570. FileCache_mock.return_value = None
  571. get_custom_actions_base_dir_mock.return_value = "some path"
  572. _, script = tempfile.mkstemp()
  573. command = {
  574. 'role' : 'any',
  575. 'commandParams': {
  576. 'script_type': 'PYTHON',
  577. 'script': 'some_custom_action.py',
  578. 'command_timeout': '600',
  579. 'jdk_location' : 'some_location'
  580. },
  581. 'taskId' : '13',
  582. 'roleCommand': 'ACTIONEXECUTE',
  583. 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
  584. '__handle': BackgroundCommandExecutionHandle({'taskId': '13'}, 13,
  585. MagicMock(), MagicMock())
  586. }
  587. dummy_controller = MagicMock()
  588. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  589. import TestActionQueue
  590. pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config)
  591. TestActionQueue.patch_output_file(pyex)
  592. pyex.condenseOutput = MagicMock()
  593. get_py_executor_mock.return_value = pyex
  594. orchestrator.dump_command_to_json = MagicMock()
  595. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  596. self.assertEqual(ret['exitcode'], 777)
  597. def tearDown(self):
  598. # enable stdout
  599. sys.stdout = sys.__stdout__