TestCustomServiceOrchestrator.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import ConfigParser
  18. import os
  19. import pprint
  20. from unittest import TestCase
  21. import threading
  22. import tempfile
  23. import time
  24. from threading import Thread
  25. from PythonExecutor import PythonExecutor
  26. from CustomServiceOrchestrator import CustomServiceOrchestrator
  27. from AmbariConfig import AmbariConfig
  28. from mock.mock import MagicMock, patch
  29. import StringIO
  30. import sys
  31. from AgentException import AgentException
  32. from FileCache import FileCache
  33. from LiveStatus import LiveStatus
  34. class TestCustomServiceOrchestrator(TestCase):
  35. def setUp(self):
  36. # disable stdout
  37. out = StringIO.StringIO()
  38. sys.stdout = out
  39. # generate sample config
  40. tmpdir = tempfile.gettempdir()
  41. self.config = ConfigParser.RawConfigParser()
  42. self.config.add_section('agent')
  43. self.config.set('agent', 'prefix', tmpdir)
  44. self.config.set('agent', 'cache_dir', "/cachedir")
  45. self.config.add_section('python')
  46. self.config.set('python', 'custom_actions_dir', tmpdir)
  47. @patch.object(FileCache, "__init__")
  48. def test_add_reg_listener_to_controller(self, FileCache_mock):
  49. FileCache_mock.return_value = None
  50. dummy_controller = MagicMock()
  51. config = AmbariConfig().getConfig()
  52. tempdir = tempfile.gettempdir()
  53. config.set('agent', 'prefix', tempdir)
  54. CustomServiceOrchestrator(config, dummy_controller)
  55. self.assertTrue(dummy_controller.registration_listeners.append.called)
  56. @patch.object(CustomServiceOrchestrator, 'decompressClusterHostInfo')
  57. @patch("hostname.public_hostname")
  58. @patch("os.path.isfile")
  59. @patch("os.unlink")
  60. @patch.object(FileCache, "__init__")
  61. def test_dump_command_to_json(self, FileCache_mock, unlink_mock,
  62. isfile_mock, hostname_mock,
  63. decompress_cluster_host_info_mock):
  64. FileCache_mock.return_value = None
  65. hostname_mock.return_value = "test.hst"
  66. command = {
  67. 'commandType': 'EXECUTION_COMMAND',
  68. 'role': u'DATANODE',
  69. 'roleCommand': u'INSTALL',
  70. 'commandId': '1-1',
  71. 'taskId': 3,
  72. 'clusterName': u'cc',
  73. 'serviceName': u'HDFS',
  74. 'configurations':{'global' : {}},
  75. 'configurationTags':{'global' : { 'tag': 'v1' }},
  76. 'clusterHostInfo':{'namenode_host' : ['1'],
  77. 'slave_hosts' : ['0', '1'],
  78. 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  79. 'all_ping_ports': ['8670:0,1']}
  80. }
  81. decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'],
  82. 'slave_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  83. 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
  84. 'all_ping_ports': ['8670', '8670']}
  85. config = AmbariConfig().getConfig()
  86. tempdir = tempfile.gettempdir()
  87. config.set('agent', 'prefix', tempdir)
  88. dummy_controller = MagicMock()
  89. orchestrator = CustomServiceOrchestrator(config, dummy_controller)
  90. isfile_mock.return_value = True
  91. # Test dumping EXECUTION_COMMAND
  92. json_file = orchestrator.dump_command_to_json(command)
  93. self.assertTrue(os.path.exists(json_file))
  94. self.assertTrue(os.path.getsize(json_file) > 0)
  95. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  96. self.assertTrue(json_file.endswith("command-3.json"))
  97. self.assertTrue(decompress_cluster_host_info_mock.called)
  98. os.unlink(json_file)
  99. # Test dumping STATUS_COMMAND
  100. command['commandType']='STATUS_COMMAND'
  101. decompress_cluster_host_info_mock.reset_mock()
  102. json_file = orchestrator.dump_command_to_json(command)
  103. self.assertTrue(os.path.exists(json_file))
  104. self.assertTrue(os.path.getsize(json_file) > 0)
  105. self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
  106. self.assertTrue(json_file.endswith("status_command.json"))
  107. self.assertFalse(decompress_cluster_host_info_mock.called)
  108. os.unlink(json_file)
  109. # Testing side effect of dump_command_to_json
  110. self.assertEquals(command['public_hostname'], "test.hst")
  111. self.assertTrue(unlink_mock.called)
  112. @patch("os.path.exists")
  113. @patch.object(FileCache, "__init__")
  114. def test_resolve_script_path(self, FileCache_mock, exists_mock):
  115. FileCache_mock.return_value = None
  116. dummy_controller = MagicMock()
  117. config = AmbariConfig().getConfig()
  118. orchestrator = CustomServiceOrchestrator(config, dummy_controller)
  119. # Testing existing path
  120. exists_mock.return_value = True
  121. path = orchestrator.\
  122. resolve_script_path("/HBASE/package", "scripts/hbase_master.py", "PYTHON")
  123. self.assertEqual("/HBASE/package/scripts/hbase_master.py", path)
  124. # Testing not existing path
  125. exists_mock.return_value = False
  126. try:
  127. orchestrator.resolve_script_path("/HBASE",
  128. "scripts/hbase_master.py", "PYTHON")
  129. self.fail('ExpectedException not thrown')
  130. except AgentException:
  131. pass # Expected
  132. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  133. @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
  134. @patch.object(FileCache, "get_service_base_dir")
  135. @patch.object(FileCache, "get_hook_base_dir")
  136. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  137. @patch.object(PythonExecutor, "run_file")
  138. @patch.object(FileCache, "__init__")
  139. def test_runCommand(self, FileCache_mock,
  140. run_file_mock, dump_command_to_json_mock,
  141. get_hook_base_dir_mock, get_service_base_dir_mock,
  142. resolve_hook_script_path_mock, resolve_script_path_mock):
  143. FileCache_mock.return_value = None
  144. command = {
  145. 'role' : 'REGION_SERVER',
  146. 'hostLevelParams' : {
  147. 'stack_name' : 'HDP',
  148. 'stack_version' : '2.0.7',
  149. 'jdk_location' : 'some_location'
  150. },
  151. 'commandParams': {
  152. 'script_type': 'PYTHON',
  153. 'script': 'scripts/hbase_regionserver.py',
  154. 'command_timeout': '600',
  155. 'service_package_folder' : 'HBASE'
  156. },
  157. 'taskId' : '3',
  158. 'roleCommand': 'INSTALL'
  159. }
  160. get_service_base_dir_mock.return_value = "/basedir/"
  161. resolve_script_path_mock.return_value = "/basedir/scriptpath"
  162. resolve_hook_script_path_mock.return_value = \
  163. ('/hooks_dir/prefix-command/scripts/hook.py',
  164. '/hooks_dir/prefix-command')
  165. dummy_controller = MagicMock()
  166. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  167. get_hook_base_dir_mock.return_value = "/hooks/"
  168. # normal run case
  169. run_file_mock.return_value = {
  170. 'stdout' : 'sss',
  171. 'stderr' : 'eee',
  172. 'exitcode': 0,
  173. }
  174. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  175. self.assertEqual(ret['exitcode'], 0)
  176. self.assertTrue(run_file_mock.called)
  177. self.assertEqual(run_file_mock.call_count, 3)
  178. run_file_mock.reset_mock()
  179. # Case when we force another command
  180. run_file_mock.return_value = {
  181. 'stdout' : 'sss',
  182. 'stderr' : 'eee',
  183. 'exitcode': 0,
  184. }
  185. ret = orchestrator.runCommand(command, "out.txt", "err.txt",
  186. forsed_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
  187. ## Check that override_output_files was true only during first call
  188. self.assertEquals(run_file_mock.call_args_list[0][0][7], True)
  189. self.assertEquals(run_file_mock.call_args_list[1][0][7], False)
  190. self.assertEquals(run_file_mock.call_args_list[2][0][7], False)
  191. ## Check that forsed_command_name was taken into account
  192. self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
  193. CustomServiceOrchestrator.COMMAND_NAME_STATUS)
  194. run_file_mock.reset_mock()
  195. # unknown script type case
  196. command['commandParams']['script_type'] = "SOME_TYPE"
  197. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  198. self.assertEqual(ret['exitcode'], 1)
  199. self.assertFalse(run_file_mock.called)
  200. self.assertTrue("Unknown script type" in ret['stdout'])
  201. #By default returns empty dictionary
  202. self.assertEqual(ret['structuredOut'], '{}')
  203. pass
  204. @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
  205. @patch.object(PythonExecutor, "run_file")
  206. @patch.object(FileCache, "__init__")
  207. @patch.object(FileCache, "get_custom_actions_base_dir")
  208. def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock,
  209. FileCache_mock,
  210. run_file_mock, dump_command_to_json_mock):
  211. FileCache_mock.return_value = None
  212. get_custom_actions_base_dir_mock.return_value = "some path"
  213. _, script = tempfile.mkstemp()
  214. command = {
  215. 'role' : 'any',
  216. 'commandParams': {
  217. 'script_type': 'PYTHON',
  218. 'script': 'some_custom_action.py',
  219. 'command_timeout': '600',
  220. 'jdk_location' : 'some_location'
  221. },
  222. 'taskId' : '3',
  223. 'roleCommand': 'ACTIONEXECUTE'
  224. }
  225. dummy_controller = MagicMock()
  226. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  227. # normal run case
  228. run_file_mock.return_value = {
  229. 'stdout' : 'sss',
  230. 'stderr' : 'eee',
  231. 'exitcode': 0,
  232. }
  233. ret = orchestrator.runCommand(command, "out.txt", "err.txt")
  234. self.assertEqual(ret['exitcode'], 0)
  235. self.assertTrue(run_file_mock.called)
  236. # Hoooks are not supported for custom actions,
  237. # that's why run_file() should be called only once
  238. self.assertEqual(run_file_mock.call_count, 1)
  239. @patch("os.path.isfile")
  240. @patch.object(FileCache, "__init__")
  241. def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock):
  242. FileCache_mock.return_value = None
  243. dummy_controller = MagicMock()
  244. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  245. # Testing None param
  246. res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command",
  247. "script_type")
  248. self.assertEqual(res1, None)
  249. # Testing existing hook script
  250. isfile_mock.return_value = True
  251. res2 = orchestrator.resolve_hook_script_path("/hooks_dir/", "prefix", "command",
  252. "script_type")
  253. self.assertEqual(res2, ('/hooks_dir/prefix-command/scripts/hook.py',
  254. '/hooks_dir/prefix-command'))
  255. # Testing not existing hook script
  256. isfile_mock.return_value = False
  257. res3 = orchestrator.resolve_hook_script_path("/hooks_dir/", "prefix", "command",
  258. "script_type")
  259. self.assertEqual(res3, None)
  260. @patch.object(CustomServiceOrchestrator, "runCommand")
  261. @patch.object(FileCache, "__init__")
  262. def test_requestComponentStatus(self, FileCache_mock, runCommand_mock):
  263. FileCache_mock.return_value = None
  264. status_command = {
  265. "serviceName" : 'HDFS',
  266. "commandType" : "STATUS_COMMAND",
  267. "clusterName" : "",
  268. "componentName" : "DATANODE",
  269. 'configurations':{}
  270. }
  271. dummy_controller = MagicMock()
  272. orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
  273. # Test alive case
  274. runCommand_mock.return_value = {
  275. "exitcode" : 0
  276. }
  277. status = orchestrator.requestComponentStatus(status_command)
  278. self.assertEqual(runCommand_mock.return_value, status)
  279. # Test dead case
  280. runCommand_mock.return_value = {
  281. "exitcode" : 1
  282. }
  283. status = orchestrator.requestComponentStatus(status_command)
  284. self.assertEqual(runCommand_mock.return_value, status)
  285. def tearDown(self):
  286. # enable stdout
  287. sys.stdout = sys.__stdout__