TestActionQueue.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. #!/usr/bin/env python2.6
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. from unittest import TestCase
  18. from ambari_agent.LiveStatus import LiveStatus
  19. from ambari_agent.PuppetExecutor import PuppetExecutor
  20. from ambari_agent.ActionQueue import ActionQueue
  21. from ambari_agent.AmbariConfig import AmbariConfig
  22. from ambari_agent.ActionDependencyManager import ActionDependencyManager
  23. import os, errno, time, pprint, tempfile, threading
  24. import StringIO
  25. import sys
  26. from threading import Thread
  27. from mock.mock import patch, MagicMock, call
  28. from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
  29. from ambari_agent.UpgradeExecutor import UpgradeExecutor
  30. class TestActionQueue(TestCase):
  31. def setUp(self):
  32. out = StringIO.StringIO()
  33. sys.stdout = out
  34. # save original open() method for later use
  35. self.original_open = open
  36. def tearDown(self):
  37. sys.stdout = sys.__stdout__
  38. datanode_install_command = {
  39. 'commandType': 'EXECUTION_COMMAND',
  40. 'role': u'DATANODE',
  41. 'roleCommand': u'INSTALL',
  42. 'commandId': '1-1',
  43. 'taskId': 3,
  44. 'clusterName': u'cc',
  45. 'serviceName': u'HDFS',
  46. 'configurations':{'global' : {}},
  47. 'configurationTags':{'global' : { 'tag': 'v1' }}
  48. }
  49. datanode_upgrade_command = {
  50. 'commandId': 17,
  51. 'role' : "role",
  52. 'taskId' : "taskId",
  53. 'clusterName' : "clusterName",
  54. 'serviceName' : "serviceName",
  55. 'roleCommand' : 'UPGRADE',
  56. 'hostname' : "localhost.localdomain",
  57. 'hostLevelParams': "hostLevelParams",
  58. 'clusterHostInfo': "clusterHostInfo",
  59. 'commandType': "EXECUTION_COMMAND",
  60. 'configurations':{'global' : {}},
  61. 'roleParams': {},
  62. 'commandParams' : {
  63. 'source_stack_version' : 'HDP-1.2.1',
  64. 'target_stack_version' : 'HDP-1.3.0'
  65. }
  66. }
  67. namenode_install_command = {
  68. 'commandType': 'EXECUTION_COMMAND',
  69. 'role': u'NAMENODE',
  70. 'roleCommand': u'INSTALL',
  71. 'commandId': '1-1',
  72. 'taskId': 4,
  73. 'clusterName': u'cc',
  74. 'serviceName': u'HDFS',
  75. }
  76. snamenode_install_command = {
  77. 'commandType': 'EXECUTION_COMMAND',
  78. 'role': u'SECONDARY_NAMENODE',
  79. 'roleCommand': u'INSTALL',
  80. 'commandId': '1-1',
  81. 'taskId': 5,
  82. 'clusterName': u'cc',
  83. 'serviceName': u'HDFS',
  84. }
  85. nagios_install_command = {
  86. 'commandType': 'EXECUTION_COMMAND',
  87. 'role': u'NAGIOS',
  88. 'roleCommand': u'INSTALL',
  89. 'commandId': '1-1',
  90. 'taskId': 6,
  91. 'clusterName': u'cc',
  92. 'serviceName': u'HDFS',
  93. }
  94. hbase_install_command = {
  95. 'commandType': 'EXECUTION_COMMAND',
  96. 'role': u'HBASE',
  97. 'roleCommand': u'INSTALL',
  98. 'commandId': '1-1',
  99. 'taskId': 7,
  100. 'clusterName': u'cc',
  101. 'serviceName': u'HDFS',
  102. }
  103. status_command = {
  104. "serviceName" : 'HDFS',
  105. "commandType" : "STATUS_COMMAND",
  106. "clusterName" : "",
  107. "componentName" : "DATANODE",
  108. 'configurations':{}
  109. }
  110. @patch.object(ActionDependencyManager, "read_dependencies")
  111. @patch.object(ActionDependencyManager, "get_next_action_group")
  112. @patch.object(ActionQueue, "process_portion_of_actions")
  113. def test_ActionQueueStartStop(self, process_portion_of_actions_mock,
  114. get_next_action_group_mock, read_dependencies_mock):
  115. actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
  116. actionQueue.start()
  117. time.sleep(0.1)
  118. actionQueue.stop()
  119. actionQueue.join()
  120. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  121. self.assertTrue(get_next_action_group_mock.call_count > 1)
  122. self.assertTrue(process_portion_of_actions_mock.call_count > 1)
  123. @patch.object(ActionDependencyManager, "read_dependencies")
  124. @patch.object(ActionQueue, "execute_command")
  125. @patch.object(ActionQueue, "execute_status_command")
  126. def test_process_portion_of_actions(self, execute_status_command_mock,
  127. executeCommand_mock, read_dependencies_mock):
  128. actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
  129. # Test execution of EXECUTION_COMMANDs
  130. max = 3
  131. actionQueue.MAX_CONCURRENT_ACTIONS = max
  132. unfreeze_flag = threading.Event()
  133. sync_lock = threading.RLock()
  134. stats = {
  135. 'waiting_threads' : 0
  136. }
  137. def side_effect(self):
  138. with sync_lock: # Synchtonized to avoid race effects during test execution
  139. stats['waiting_threads'] += 1
  140. unfreeze_flag.wait()
  141. executeCommand_mock.side_effect = side_effect
  142. portion = [self.datanode_install_command,
  143. self.namenode_install_command,
  144. self.snamenode_install_command,
  145. self.nagios_install_command,
  146. self.hbase_install_command]
  147. # We call method in a separate thread
  148. action_thread = Thread(target = actionQueue.process_portion_of_actions, args = (portion, ))
  149. action_thread.start()
  150. # Now we wait to check that only MAX_CONCURRENT_ACTIONS threads are running
  151. while stats['waiting_threads'] != max:
  152. time.sleep(0.1)
  153. self.assertEqual(stats['waiting_threads'], max)
  154. # unfreezing waiting threads
  155. unfreeze_flag.set()
  156. # wait until all threads are finished
  157. action_thread.join()
  158. self.assertTrue(executeCommand_mock.call_count == 5)
  159. self.assertFalse(execute_status_command_mock.called)
  160. executeCommand_mock.reset_mock()
  161. execute_status_command_mock.reset_mock()
  162. # Test execution of STATUS_COMMANDs
  163. n = 5
  164. portion = []
  165. for i in range(0, n):
  166. status_command = {
  167. 'componentName': 'DATANODE',
  168. 'commandType': 'STATUS_COMMAND',
  169. }
  170. portion.append(status_command)
  171. actionQueue.process_portion_of_actions(portion)
  172. self.assertTrue(execute_status_command_mock.call_count == n)
  173. self.assertFalse(executeCommand_mock.called)
  174. # Test execution of unknown command
  175. unknown_command = {
  176. 'commandType': 'WRONG_COMMAND',
  177. }
  178. portion = [unknown_command]
  179. actionQueue.process_portion_of_actions(portion)
  180. # no exception expected
  181. pass
  182. @patch("traceback.print_exc")
  183. @patch.object(ActionDependencyManager, "read_dependencies")
  184. @patch.object(ActionQueue, "execute_command")
  185. def test_execute_command_safely(self, execute_command_mock,
  186. read_dependencies_mock, print_exc_mock):
  187. actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
  188. # Try normal execution
  189. actionQueue.execute_command_safely('command')
  190. # Try exception ro check proper logging
  191. def side_effect(self):
  192. raise Exception("TerribleException")
  193. execute_command_mock.side_effect = side_effect
  194. actionQueue.execute_command_safely('command')
  195. self.assertTrue(print_exc_mock.called)
  196. @patch("__builtin__.open")
  197. @patch.object(ActionQueue, "status_update_callback")
  198. @patch.object(ActionDependencyManager, "read_dependencies")
  199. def test_execute_command(self, read_dependencies_mock,
  200. status_update_callback_mock, open_mock):
  201. # Make file read calls visible
  202. def open_side_effect(file, mode):
  203. if mode == 'r':
  204. file_mock = MagicMock()
  205. file_mock.read.return_value = "Read from " + str(file)
  206. return file_mock
  207. else:
  208. return self.original_open(file, mode)
  209. open_mock.side_effect = open_side_effect
  210. config = AmbariConfig().getConfig()
  211. tmpfile = tempfile.gettempdir()
  212. config.set('agent', 'prefix', tmpfile)
  213. actionQueue = ActionQueue(config, 'dummy_controller')
  214. unfreeze_flag = threading.Event()
  215. puppet_execution_result_dict = {
  216. 'stdout': 'out',
  217. 'stderr': 'stderr',
  218. }
  219. def side_effect(command, tmpoutfile, tmperrfile):
  220. unfreeze_flag.wait()
  221. return puppet_execution_result_dict
  222. def patched_aq_execute_command(command):
  223. # We have to perform patching for separate thread in the same thread
  224. with patch.object(PuppetExecutor, "runCommand") as runCommand_mock:
  225. with patch.object(UpgradeExecutor, "perform_stack_upgrade") \
  226. as perform_stack_upgrade_mock:
  227. runCommand_mock.side_effect = side_effect
  228. perform_stack_upgrade_mock.side_effect = side_effect
  229. actionQueue.execute_command(command)
  230. ### Test install/start/stop command ###
  231. ## Test successful execution with configuration tags
  232. puppet_execution_result_dict['status'] = 'COMPLETE'
  233. puppet_execution_result_dict['exitcode'] = 0
  234. # We call method in a separate thread
  235. execution_thread = Thread(target = patched_aq_execute_command ,
  236. args = (self.datanode_install_command, ))
  237. execution_thread.start()
  238. # check in progress report
  239. # wait until ready
  240. while True:
  241. time.sleep(0.1)
  242. report = actionQueue.result()
  243. if len(report['reports']) != 0:
  244. break
  245. expected = {'status': 'IN_PROGRESS',
  246. 'stderr': 'Read from /tmp/errors-3.txt',
  247. 'stdout': 'Read from /tmp/output-3.txt',
  248. 'clusterName': u'cc',
  249. 'roleCommand': u'INSTALL',
  250. 'serviceName': u'HDFS',
  251. 'role': u'DATANODE',
  252. 'actionId': '1-1',
  253. 'taskId': 3,
  254. 'exitCode': 777}
  255. self.assertEqual(report['reports'][0], expected)
  256. # Continue command execution
  257. unfreeze_flag.set()
  258. # wait until ready
  259. while report['reports'][0]['status'] == 'IN_PROGRESS':
  260. time.sleep(0.1)
  261. report = actionQueue.result()
  262. # check report
  263. configname = os.path.join(tmpfile, 'config.json')
  264. expected = {'status': 'COMPLETED',
  265. 'stderr': 'stderr',
  266. 'stdout': 'out',
  267. 'clusterName': u'cc',
  268. 'configurationTags': {'global': {'tag': 'v1'}},
  269. 'roleCommand': u'INSTALL',
  270. 'serviceName': u'HDFS',
  271. 'role': u'DATANODE',
  272. 'actionId': '1-1',
  273. 'taskId': 3,
  274. 'exitCode': 0}
  275. self.assertEqual(len(report['reports']), 1)
  276. self.assertEqual(report['reports'][0], expected)
  277. self.assertTrue(os.path.isfile(configname))
  278. # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
  279. self.assertEqual(status_update_callback_mock.call_count, 2)
  280. os.remove(configname)
  281. # now should not have reports (read complete/failed reports are deleted)
  282. report = actionQueue.result()
  283. self.assertEqual(len(report['reports']), 0)
  284. ## Test failed execution
  285. puppet_execution_result_dict['status'] = 'FAILED'
  286. puppet_execution_result_dict['exitcode'] = 13
  287. # We call method in a separate thread
  288. execution_thread = Thread(target = patched_aq_execute_command ,
  289. args = (self.datanode_install_command, ))
  290. execution_thread.start()
  291. unfreeze_flag.set()
  292. # check in progress report
  293. # wait until ready
  294. report = actionQueue.result()
  295. while len(report['reports']) == 0 or \
  296. report['reports'][0]['status'] == 'IN_PROGRESS':
  297. time.sleep(0.1)
  298. report = actionQueue.result()
  299. # check report
  300. expected = {'status': 'FAILED',
  301. 'stderr': 'stderr',
  302. 'stdout': 'out',
  303. 'clusterName': u'cc',
  304. 'roleCommand': u'INSTALL',
  305. 'serviceName': u'HDFS',
  306. 'role': u'DATANODE',
  307. 'actionId': '1-1',
  308. 'taskId': 3,
  309. 'exitCode': 13}
  310. self.assertEqual(len(report['reports']), 1)
  311. self.assertEqual(report['reports'][0], expected)
  312. # now should not have reports (read complete/failed reports are deleted)
  313. report = actionQueue.result()
  314. self.assertEqual(len(report['reports']), 0)
  315. ### Test upgrade command ###
  316. puppet_execution_result_dict['status'] = 'COMPLETE'
  317. puppet_execution_result_dict['exitcode'] = 0
  318. execution_thread = Thread(target = patched_aq_execute_command ,
  319. args = (self.datanode_upgrade_command, ))
  320. execution_thread.start()
  321. unfreeze_flag.set()
  322. # wait until ready
  323. report = actionQueue.result()
  324. while len(report['reports']) == 0 or \
  325. report['reports'][0]['status'] == 'IN_PROGRESS':
  326. time.sleep(0.1)
  327. report = actionQueue.result()
  328. # check report
  329. expected = {'status': 'COMPLETED',
  330. 'stderr': 'stderr',
  331. 'stdout': 'out',
  332. 'clusterName': 'clusterName',
  333. 'roleCommand': 'UPGRADE',
  334. 'serviceName': 'serviceName',
  335. 'role': 'role',
  336. 'actionId': 17,
  337. 'taskId': 'taskId',
  338. 'exitCode': 0}
  339. self.assertEqual(len(report['reports']), 1)
  340. self.assertEqual(report['reports'][0], expected)
  341. # now should not have reports (read complete/failed reports are deleted)
  342. report = actionQueue.result()
  343. self.assertEqual(len(report['reports']), 0)
  344. @patch.object(ActionQueue, "status_update_callback")
  345. @patch.object(StackVersionsFileHandler, "read_stack_version")
  346. @patch.object(ActionDependencyManager, "read_dependencies")
  347. @patch.object(ActionQueue, "execute_command")
  348. @patch.object(LiveStatus, "build")
  349. def test_execute_status_command(self, build_mock, execute_command_mock,
  350. read_dependencies_mock, read_stack_version_mock,
  351. status_update_callback):
  352. actionQueue = ActionQueue(AmbariConfig().getConfig(), 'dummy_controller')
  353. build_mock.return_value = "dummy report"
  354. # Try normal execution
  355. actionQueue.execute_status_command(self.status_command)
  356. report = actionQueue.result()
  357. expected = 'dummy report'
  358. self.assertEqual(len(report['componentStatus']), 1)
  359. self.assertEqual(report['componentStatus'][0], expected)