TestActionQueue.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. from Queue import Queue
  18. from unittest import TestCase
  19. from ambari_agent.LiveStatus import LiveStatus
  20. from ambari_agent.ActionQueue import ActionQueue
  21. from ambari_agent.AmbariConfig import AmbariConfig
  22. import os, errno, time, pprint, tempfile, threading, json
  23. import StringIO
  24. import sys
  25. from threading import Thread
  26. import copy
  27. from mock.mock import patch, MagicMock, call
  28. from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
  29. from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
  30. from ambari_agent.PythonExecutor import PythonExecutor
  31. from ambari_agent.CommandStatusDict import CommandStatusDict
  32. from ambari_agent.ActualConfigHandler import ActualConfigHandler
  33. from FileCache import FileCache
  34. from ambari_commons import OSCheck
  35. from only_for_platform import only_for_platform, get_platform, not_for_platform, PLATFORM_LINUX, PLATFORM_WINDOWS
  36. if get_platform() != PLATFORM_WINDOWS:
  37. os_distro_value = ('Suse','11','Final')
  38. else:
  39. os_distro_value = ('win2012serverr2','6.3','WindowsServer')
  40. class TestActionQueue(TestCase):
  41. def setUp(self):
  42. # save original open() method for later use
  43. self.original_open = open
  44. def tearDown(self):
  45. sys.stdout = sys.__stdout__
  46. datanode_install_command = {
  47. 'commandType': 'EXECUTION_COMMAND',
  48. 'role': u'DATANODE',
  49. 'roleCommand': u'INSTALL',
  50. 'commandId': '1-1',
  51. 'taskId': 3,
  52. 'clusterName': u'cc',
  53. 'serviceName': u'HDFS',
  54. 'hostLevelParams': {},
  55. 'configurations':{'global' : {}},
  56. 'configurationTags':{'global' : { 'tag': 'v1' }}
  57. }
  58. datanode_upgrade_command = {
  59. 'commandId': 17,
  60. 'role' : "role",
  61. 'taskId' : "taskId",
  62. 'clusterName' : "clusterName",
  63. 'serviceName' : "serviceName",
  64. 'roleCommand' : 'UPGRADE',
  65. 'hostname' : "localhost.localdomain",
  66. 'hostLevelParams': {},
  67. 'clusterHostInfo': "clusterHostInfo",
  68. 'commandType': "EXECUTION_COMMAND",
  69. 'configurations':{'global' : {}},
  70. 'roleParams': {},
  71. 'commandParams' : {
  72. 'source_stack_version' : 'HDP-1.2.1',
  73. 'target_stack_version' : 'HDP-1.3.0'
  74. }
  75. }
  76. namenode_install_command = {
  77. 'commandType': 'EXECUTION_COMMAND',
  78. 'role': u'NAMENODE',
  79. 'roleCommand': u'INSTALL',
  80. 'commandId': '1-1',
  81. 'taskId': 4,
  82. 'clusterName': u'cc',
  83. 'serviceName': u'HDFS',
  84. 'hostLevelParams': {}
  85. }
  86. snamenode_install_command = {
  87. 'commandType': 'EXECUTION_COMMAND',
  88. 'role': u'SECONDARY_NAMENODE',
  89. 'roleCommand': u'INSTALL',
  90. 'commandId': '1-1',
  91. 'taskId': 5,
  92. 'clusterName': u'cc',
  93. 'serviceName': u'HDFS',
  94. 'hostLevelParams': {}
  95. }
  96. hbase_install_command = {
  97. 'commandType': 'EXECUTION_COMMAND',
  98. 'role': u'HBASE',
  99. 'roleCommand': u'INSTALL',
  100. 'commandId': '1-1',
  101. 'taskId': 7,
  102. 'clusterName': u'cc',
  103. 'serviceName': u'HDFS',
  104. 'hostLevelParams': {}
  105. }
  106. status_command = {
  107. "serviceName" : 'HDFS',
  108. "commandType" : "STATUS_COMMAND",
  109. "clusterName" : "",
  110. "componentName" : "DATANODE",
  111. 'configurations':{},
  112. 'hostLevelParams': {}
  113. }
  114. datanode_restart_command = {
  115. 'commandType': 'EXECUTION_COMMAND',
  116. 'role': u'DATANODE',
  117. 'roleCommand': u'CUSTOM_COMMAND',
  118. 'commandId': '1-1',
  119. 'taskId': 9,
  120. 'clusterName': u'cc',
  121. 'serviceName': u'HDFS',
  122. 'configurations':{'global' : {}},
  123. 'configurationTags':{'global' : { 'tag': 'v123' }},
  124. 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
  125. }
  126. datanode_restart_command_no_clients_update = {
  127. 'commandType': 'EXECUTION_COMMAND',
  128. 'role': u'DATANODE',
  129. 'roleCommand': u'CUSTOM_COMMAND',
  130. 'commandId': '1-1',
  131. 'taskId': 9,
  132. 'clusterName': u'cc',
  133. 'serviceName': u'HDFS',
  134. 'configurations':{'global' : {}},
  135. 'configurationTags':{'global' : { 'tag': 'v123' }},
  136. 'hostLevelParams':{'custom_command': 'RESTART'}
  137. }
  138. status_command_for_alerts = {
  139. "serviceName" : 'FLUME',
  140. "commandType" : "STATUS_COMMAND",
  141. "clusterName" : "",
  142. "componentName" : "FLUME_HANDLER",
  143. 'configurations':{},
  144. 'hostLevelParams': {}
  145. }
  146. background_command = {
  147. 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
  148. 'role': 'NAMENODE',
  149. 'roleCommand': 'CUSTOM_COMMAND',
  150. 'commandId': '1-1',
  151. 'taskId': 19,
  152. 'clusterName': 'c1',
  153. 'serviceName': 'HDFS',
  154. 'configurations':{'global' : {}},
  155. 'configurationTags':{'global' : { 'tag': 'v123' }},
  156. 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
  157. 'commandParams' : {
  158. 'script_type' : 'PYTHON',
  159. 'script' : 'script.py',
  160. 'command_timeout' : '600',
  161. 'jdk_location' : '.',
  162. 'service_package_folder' : '.'
  163. }
  164. }
  165. cancel_background_command = {
  166. 'commandType': 'EXECUTION_COMMAND',
  167. 'role': 'NAMENODE',
  168. 'roleCommand': 'ACTIONEXECUTE',
  169. 'commandId': '1-1',
  170. 'taskId': 20,
  171. 'clusterName': 'c1',
  172. 'serviceName': 'HDFS',
  173. 'configurations':{'global' : {}},
  174. 'configurationTags':{'global' : {}},
  175. 'hostLevelParams':{},
  176. 'commandParams' : {
  177. 'script_type' : 'PYTHON',
  178. 'script' : 'cancel_background_task.py',
  179. 'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
  180. 'jdk_location' : '.',
  181. 'command_timeout' : '600',
  182. 'service_package_folder' : '.',
  183. 'cancel_policy': 'SIGKILL',
  184. 'cancel_task_id': "19",
  185. }
  186. }
  187. @patch.object(ActionQueue, "process_command")
  188. @patch.object(Queue, "get")
  189. @patch.object(CustomServiceOrchestrator, "__init__")
  190. def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
  191. get_mock, process_command_mock):
  192. CustomServiceOrchestrator_mock.return_value = None
  193. dummy_controller = MagicMock()
  194. config = MagicMock()
  195. actionQueue = ActionQueue(config, dummy_controller)
  196. actionQueue.start()
  197. time.sleep(0.1)
  198. actionQueue.stop()
  199. actionQueue.join()
  200. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  201. self.assertTrue(process_command_mock.call_count > 1)
  202. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  203. @patch("traceback.print_exc")
  204. @patch.object(ActionQueue, "execute_command")
  205. @patch.object(ActionQueue, "execute_status_command")
  206. def test_process_command(self, execute_status_command_mock,
  207. execute_command_mock, print_exc_mock):
  208. dummy_controller = MagicMock()
  209. config = AmbariConfig()
  210. config.set('agent', 'tolerate_download_failures', "true")
  211. actionQueue = ActionQueue(config, dummy_controller)
  212. execution_command = {
  213. 'commandType' : ActionQueue.EXECUTION_COMMAND,
  214. }
  215. status_command = {
  216. 'commandType' : ActionQueue.STATUS_COMMAND,
  217. }
  218. wrong_command = {
  219. 'commandType' : "SOME_WRONG_COMMAND",
  220. }
  221. # Try wrong command
  222. actionQueue.process_command(wrong_command)
  223. self.assertFalse(execute_command_mock.called)
  224. self.assertFalse(execute_status_command_mock.called)
  225. self.assertFalse(print_exc_mock.called)
  226. execute_command_mock.reset_mock()
  227. execute_status_command_mock.reset_mock()
  228. print_exc_mock.reset_mock()
  229. # Try normal execution
  230. actionQueue.process_command(execution_command)
  231. self.assertTrue(execute_command_mock.called)
  232. self.assertFalse(execute_status_command_mock.called)
  233. self.assertFalse(print_exc_mock.called)
  234. execute_command_mock.reset_mock()
  235. execute_status_command_mock.reset_mock()
  236. print_exc_mock.reset_mock()
  237. actionQueue.process_command(status_command)
  238. self.assertFalse(execute_command_mock.called)
  239. self.assertTrue(execute_status_command_mock.called)
  240. self.assertFalse(print_exc_mock.called)
  241. execute_command_mock.reset_mock()
  242. execute_status_command_mock.reset_mock()
  243. print_exc_mock.reset_mock()
  244. # Try exception to check proper logging
  245. def side_effect(self):
  246. raise Exception("TerribleException")
  247. execute_command_mock.side_effect = side_effect
  248. actionQueue.process_command(execution_command)
  249. self.assertTrue(print_exc_mock.called)
  250. print_exc_mock.reset_mock()
  251. execute_status_command_mock.side_effect = side_effect
  252. actionQueue.process_command(execution_command)
  253. self.assertTrue(print_exc_mock.called)
  254. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  255. @patch("__builtin__.open")
  256. @patch.object(ActionQueue, "status_update_callback")
  257. def test_execute_command(self, status_update_callback_mock, open_mock):
  258. # Make file read calls visible
  259. def open_side_effect(file, mode):
  260. if mode == 'r':
  261. file_mock = MagicMock()
  262. file_mock.read.return_value = "Read from " + str(file)
  263. return file_mock
  264. else:
  265. return self.original_open(file, mode)
  266. open_mock.side_effect = open_side_effect
  267. config = AmbariConfig()
  268. tempdir = tempfile.gettempdir()
  269. config.set('agent', 'prefix', tempdir)
  270. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  271. config.set('agent', 'tolerate_download_failures', "true")
  272. dummy_controller = MagicMock()
  273. actionQueue = ActionQueue(config, dummy_controller)
  274. unfreeze_flag = threading.Event()
  275. python_execution_result_dict = {
  276. 'stdout': 'out',
  277. 'stderr': 'stderr',
  278. 'structuredOut' : ''
  279. }
  280. def side_effect(command, tmpoutfile, tmperrfile):
  281. unfreeze_flag.wait()
  282. return python_execution_result_dict
  283. def patched_aq_execute_command(command):
  284. # We have to perform patching for separate thread in the same thread
  285. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  286. runCommand_mock.side_effect = side_effect
  287. actionQueue.execute_command(command)
  288. ### Test install/start/stop command ###
  289. ## Test successful execution with configuration tags
  290. python_execution_result_dict['status'] = 'COMPLETE'
  291. python_execution_result_dict['exitcode'] = 0
  292. # We call method in a separate thread
  293. execution_thread = Thread(target = patched_aq_execute_command ,
  294. args = (self.datanode_install_command, ))
  295. execution_thread.start()
  296. # check in progress report
  297. # wait until ready
  298. while True:
  299. time.sleep(0.1)
  300. report = actionQueue.result()
  301. if len(report['reports']) != 0:
  302. break
  303. expected = {'status': 'IN_PROGRESS',
  304. 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
  305. 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
  306. 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
  307. 'clusterName': u'cc',
  308. 'roleCommand': u'INSTALL',
  309. 'serviceName': u'HDFS',
  310. 'role': u'DATANODE',
  311. 'actionId': '1-1',
  312. 'taskId': 3,
  313. 'exitCode': 777}
  314. self.assertEqual(report['reports'][0], expected)
  315. # Continue command execution
  316. unfreeze_flag.set()
  317. # wait until ready
  318. while report['reports'][0]['status'] == 'IN_PROGRESS':
  319. time.sleep(0.1)
  320. report = actionQueue.result()
  321. # check report
  322. configname = os.path.join(tempdir, 'config.json')
  323. expected = {'status': 'COMPLETED',
  324. 'stderr': 'stderr',
  325. 'stdout': 'out',
  326. 'clusterName': u'cc',
  327. 'structuredOut': '""',
  328. 'roleCommand': u'INSTALL',
  329. 'serviceName': u'HDFS',
  330. 'role': u'DATANODE',
  331. 'actionId': '1-1',
  332. 'taskId': 3,
  333. 'configurationTags': {'global': {'tag': 'v1'}},
  334. 'exitCode': 0}
  335. self.assertEqual(len(report['reports']), 1)
  336. self.assertEqual(report['reports'][0], expected)
  337. self.assertTrue(os.path.isfile(configname))
  338. # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
  339. self.assertEqual(status_update_callback_mock.call_count, 2)
  340. os.remove(configname)
  341. # now should not have reports (read complete/failed reports are deleted)
  342. report = actionQueue.result()
  343. self.assertEqual(len(report['reports']), 0)
  344. ## Test failed execution
  345. python_execution_result_dict['status'] = 'FAILED'
  346. python_execution_result_dict['exitcode'] = 13
  347. # We call method in a separate thread
  348. execution_thread = Thread(target = patched_aq_execute_command ,
  349. args = (self.datanode_install_command, ))
  350. execution_thread.start()
  351. unfreeze_flag.set()
  352. # check in progress report
  353. # wait until ready
  354. report = actionQueue.result()
  355. while len(report['reports']) == 0 or \
  356. report['reports'][0]['status'] == 'IN_PROGRESS':
  357. time.sleep(0.1)
  358. report = actionQueue.result()
  359. # check report
  360. expected = {'status': 'FAILED',
  361. 'stderr': 'stderr',
  362. 'stdout': 'out',
  363. 'clusterName': u'cc',
  364. 'structuredOut': '""',
  365. 'roleCommand': u'INSTALL',
  366. 'serviceName': u'HDFS',
  367. 'role': u'DATANODE',
  368. 'actionId': '1-1',
  369. 'taskId': 3,
  370. 'exitCode': 13}
  371. self.assertEqual(len(report['reports']), 1)
  372. self.assertEqual(report['reports'][0], expected)
  373. # now should not have reports (read complete/failed reports are deleted)
  374. report = actionQueue.result()
  375. self.assertEqual(len(report['reports']), 0)
  376. ### Test upgrade command ###
  377. python_execution_result_dict['status'] = 'COMPLETE'
  378. python_execution_result_dict['exitcode'] = 0
  379. execution_thread = Thread(target = patched_aq_execute_command ,
  380. args = (self.datanode_upgrade_command, ))
  381. execution_thread.start()
  382. unfreeze_flag.set()
  383. # wait until ready
  384. report = actionQueue.result()
  385. while len(report['reports']) == 0 or \
  386. report['reports'][0]['status'] == 'IN_PROGRESS':
  387. time.sleep(0.1)
  388. report = actionQueue.result()
  389. # check report
  390. expected = {'status': 'COMPLETED',
  391. 'stderr': 'stderr',
  392. 'stdout': 'out',
  393. 'clusterName': 'clusterName',
  394. 'structuredOut': '""',
  395. 'roleCommand': 'UPGRADE',
  396. 'serviceName': 'serviceName',
  397. 'role': 'role',
  398. 'actionId': 17,
  399. 'taskId': 'taskId',
  400. 'exitCode': 0}
  401. self.assertEqual(len(report['reports']), 1)
  402. self.assertEqual(report['reports'][0], expected)
  403. # now should not have reports (read complete/failed reports are deleted)
  404. report = actionQueue.result()
  405. self.assertEqual(len(report['reports']), 0)
  406. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  407. @patch.object(CustomServiceOrchestrator, "runCommand")
  408. @patch("CommandStatusDict.CommandStatusDict")
  409. @patch.object(ActionQueue, "status_update_callback")
  410. def test_store_configuration_tags(self, status_update_callback_mock,
  411. command_status_dict_mock,
  412. cso_runCommand_mock):
  413. custom_service_orchestrator_execution_result_dict = {
  414. 'stdout': 'out',
  415. 'stderr': 'stderr',
  416. 'structuredOut' : '',
  417. 'exitcode' : 0
  418. }
  419. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  420. config = AmbariConfig().getConfig()
  421. tempdir = tempfile.gettempdir()
  422. config.set('agent', 'prefix', tempdir)
  423. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  424. config.set('agent', 'tolerate_download_failures', "true")
  425. dummy_controller = MagicMock()
  426. actionQueue = ActionQueue(config, dummy_controller)
  427. actionQueue.execute_command(self.datanode_restart_command)
  428. report = actionQueue.result()
  429. expected = {'status': 'COMPLETED',
  430. 'configurationTags': {'global': {'tag': 'v123'}},
  431. 'stderr': 'stderr',
  432. 'stdout': 'out',
  433. 'clusterName': u'cc',
  434. 'structuredOut': '""',
  435. 'roleCommand': u'CUSTOM_COMMAND',
  436. 'serviceName': u'HDFS',
  437. 'role': u'DATANODE',
  438. 'actionId': '1-1',
  439. 'taskId': 9,
  440. 'customCommand': 'RESTART',
  441. 'exitCode': 0}
  442. # Agent caches configurationTags if custom_command RESTART completed
  443. self.assertEqual(len(report['reports']), 1)
  444. self.assertEqual(expected, report['reports'][0])
  445. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  446. @patch.object(ActualConfigHandler, "write_client_components")
  447. @patch.object(CustomServiceOrchestrator, "runCommand")
  448. @patch("CommandStatusDict.CommandStatusDict")
  449. @patch.object(ActionQueue, "status_update_callback")
  450. def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
  451. command_status_dict_mock,
  452. cso_runCommand_mock, write_client_components_mock):
  453. custom_service_orchestrator_execution_result_dict = {
  454. 'stdout': 'out',
  455. 'stderr': 'stderr',
  456. 'structuredOut' : '',
  457. 'exitcode' : 0
  458. }
  459. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  460. config = AmbariConfig().getConfig()
  461. tempdir = tempfile.gettempdir()
  462. config.set('agent', 'prefix', tempdir)
  463. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  464. config.set('agent', 'tolerate_download_failures', "true")
  465. dummy_controller = MagicMock()
  466. actionQueue = ActionQueue(config, dummy_controller)
  467. actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
  468. report = actionQueue.result()
  469. expected = {'status': 'COMPLETED',
  470. 'configurationTags': {'global': {'tag': 'v123'}},
  471. 'stderr': 'stderr',
  472. 'stdout': 'out',
  473. 'clusterName': u'cc',
  474. 'structuredOut': '""',
  475. 'roleCommand': u'CUSTOM_COMMAND',
  476. 'serviceName': u'HDFS',
  477. 'role': u'DATANODE',
  478. 'actionId': '1-1',
  479. 'taskId': 9,
  480. 'customCommand': 'RESTART',
  481. 'exitCode': 0}
  482. # Agent caches configurationTags if custom_command RESTART completed
  483. self.assertEqual(len(report['reports']), 1)
  484. self.assertEqual(expected, report['reports'][0])
  485. self.assertFalse(write_client_components_mock.called)
  486. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  487. @patch.object(ActionQueue, "status_update_callback")
  488. @patch.object(StackVersionsFileHandler, "read_stack_version")
  489. @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
  490. @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
  491. @patch.object(ActionQueue, "execute_command")
  492. @patch.object(LiveStatus, "build")
  493. @patch.object(CustomServiceOrchestrator, "__init__")
  494. def test_execute_status_command(self, CustomServiceOrchestrator_mock,
  495. build_mock, execute_command_mock, requestComponentSecurityState_mock,
  496. requestComponentStatus_mock, read_stack_version_mock,
  497. status_update_callback):
  498. CustomServiceOrchestrator_mock.return_value = None
  499. dummy_controller = MagicMock()
  500. actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
  501. build_mock.return_value = {'dummy report': '' }
  502. requestComponentStatus_mock.reset_mock()
  503. requestComponentStatus_mock.return_value = {'exitcode': 0 }
  504. requestComponentSecurityState_mock.reset_mock()
  505. requestComponentSecurityState_mock.return_value = 'UNKNOWN'
  506. actionQueue.execute_status_command(self.status_command)
  507. report = actionQueue.result()
  508. expected = {'dummy report': '',
  509. 'securityState' : 'UNKNOWN'}
  510. self.assertEqual(len(report['componentStatus']), 1)
  511. self.assertEqual(report['componentStatus'][0], expected)
  512. self.assertTrue(requestComponentStatus_mock.called)
  513. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  514. @patch.object(ActionQueue, "status_update_callback")
  515. @patch.object(StackVersionsFileHandler, "read_stack_version")
  516. @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
  517. @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
  518. @patch.object(ActionQueue, "execute_command")
  519. @patch.object(LiveStatus, "build")
  520. @patch.object(CustomServiceOrchestrator, "__init__")
  521. def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
  522. requestComponentSecurityState_mock,
  523. build_mock, execute_command_mock,
  524. requestComponentStatus_mock, read_stack_version_mock,
  525. status_update_callback):
  526. CustomServiceOrchestrator_mock.return_value = None
  527. dummy_controller = MagicMock()
  528. actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
  529. requestComponentStatus_mock.reset_mock()
  530. requestComponentStatus_mock.return_value = {
  531. 'exitcode': 0,
  532. 'stdout': 'out',
  533. 'stderr': 'err',
  534. 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
  535. }
  536. build_mock.return_value = {'somestatusresult': 'aresult'}
  537. actionQueue.execute_status_command(self.status_command_for_alerts)
  538. report = actionQueue.result()
  539. self.assertTrue(requestComponentStatus_mock.called)
  540. self.assertEqual(len(report['componentStatus']), 1)
  541. self.assertTrue(report['componentStatus'][0].has_key('alerts'))
  542. @patch.object(ActionQueue, "process_command")
  543. @patch.object(Queue, "get")
  544. @patch.object(CustomServiceOrchestrator, "__init__")
  545. def test_reset_queue(self, CustomServiceOrchestrator_mock,
  546. get_mock, process_command_mock):
  547. CustomServiceOrchestrator_mock.return_value = None
  548. dummy_controller = MagicMock()
  549. config = MagicMock()
  550. actionQueue = ActionQueue(config, dummy_controller)
  551. actionQueue.start()
  552. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  553. self.assertEqual(2, actionQueue.commandQueue.qsize())
  554. actionQueue.reset()
  555. self.assertTrue(actionQueue.commandQueue.empty())
  556. time.sleep(0.1)
  557. actionQueue.stop()
  558. actionQueue.join()
  559. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  560. @patch.object(ActionQueue, "process_command")
  561. @patch.object(Queue, "get")
  562. @patch.object(CustomServiceOrchestrator, "__init__")
  563. def test_cancel(self, CustomServiceOrchestrator_mock,
  564. get_mock, process_command_mock):
  565. CustomServiceOrchestrator_mock.return_value = None
  566. dummy_controller = MagicMock()
  567. config = MagicMock()
  568. actionQueue = ActionQueue(config, dummy_controller)
  569. actionQueue.start()
  570. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  571. self.assertEqual(2, actionQueue.commandQueue.qsize())
  572. actionQueue.reset()
  573. self.assertTrue(actionQueue.commandQueue.empty())
  574. time.sleep(0.1)
  575. actionQueue.stop()
  576. actionQueue.join()
  577. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  578. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  579. @patch.object(StackVersionsFileHandler, "read_stack_version")
  580. @patch.object(CustomServiceOrchestrator, "runCommand")
  581. @patch.object(CustomServiceOrchestrator, "__init__")
  582. def test_execute_background_command(self, CustomServiceOrchestrator_mock,
  583. runCommand_mock, read_stack_version_mock
  584. ):
  585. CustomServiceOrchestrator_mock.return_value = None
  586. CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
  587. 'stdout': 'out-11',
  588. 'stderr' : 'err-13'}
  589. dummy_controller = MagicMock()
  590. actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
  591. execute_command = copy.deepcopy(self.background_command)
  592. actionQueue.put([execute_command])
  593. actionQueue.processBackgroundQueueSafeEmpty();
  594. actionQueue.processStatusCommandQueueSafeEmpty();
  595. #assert that python execturor start
  596. self.assertTrue(runCommand_mock.called)
  597. runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
  598. self.assertTrue(runningCommand is not None)
  599. self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
  600. report = actionQueue.result()
  601. self.assertEqual(len(report['reports']),1)
  602. @not_for_platform(PLATFORM_WINDOWS)
  603. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  604. @patch.object(StackVersionsFileHandler, "read_stack_version")
  605. def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock):
  606. dummy_controller = MagicMock()
  607. cfg = AmbariConfig().getConfig()
  608. cfg.set('agent', 'tolerate_download_failures', 'true')
  609. cfg.set('agent', 'prefix', '.')
  610. cfg.set('agent', 'cache_dir', 'background_tasks')
  611. actionQueue = ActionQueue(cfg, dummy_controller)
  612. patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
  613. actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
  614. result = {}
  615. lock = threading.RLock()
  616. complete_done = threading.Condition(lock)
  617. def command_complete_w(process_condenced_result, handle):
  618. with lock:
  619. result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result),
  620. 'handle' : copy.copy(handle),
  621. 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
  622. }
  623. complete_done.notifyAll()
  624. actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
  625. actionQueue.put([self.background_command])
  626. actionQueue.processBackgroundQueueSafeEmpty();
  627. actionQueue.processStatusCommandQueueSafeEmpty();
  628. with lock:
  629. complete_done.wait(.1)
  630. finished_status = result['command_complete']['command_status']
  631. self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
  632. self.assertEqual(finished_status['stdout'], 'process_out')
  633. self.assertEqual(finished_status['stderr'], 'process_err')
  634. self.assertEqual(finished_status['exitCode'], 0)
  635. runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
  636. self.assertTrue(runningCommand is not None)
  637. report = actionQueue.result()
  638. self.assertEqual(len(report['reports']),1)
  639. self.assertEqual(report['reports'][0]['stdout'],'process_out')
  640. # self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
  641. cancel_background_command = {
  642. "commandType":"CANCEL_COMMAND",
  643. "role":"AMBARI_SERVER_ACTION",
  644. "roleCommand":"ABORT",
  645. "commandId":"2--1",
  646. "taskId":20,
  647. "clusterName":"c1",
  648. "serviceName":"",
  649. "hostname":"c6401",
  650. "roleParams":{
  651. "cancelTaskIdTargets":"13,14"
  652. },
  653. }
  654. def patch_output_file(pythonExecutor):
  655. def windows_py(command, tmpout, tmperr):
  656. proc = MagicMock()
  657. proc.pid = 33
  658. proc.returncode = 0
  659. with tmpout:
  660. tmpout.write('process_out')
  661. with tmperr:
  662. tmperr.write('process_err')
  663. return proc
  664. def open_subprocess_files_win(fout, ferr, f):
  665. return MagicMock(), MagicMock()
  666. def read_result_from_files(out_path, err_path, structured_out_path):
  667. return 'process_out', 'process_err', '{"a": "b."}'
  668. pythonExecutor.launch_python_subprocess = windows_py
  669. pythonExecutor.open_subprocess_files = open_subprocess_files_win
  670. pythonExecutor.read_result_from_files = read_result_from_files
  671. def wraped(func, before = None, after = None):
  672. def wrapper(*args, **kwargs):
  673. if(before is not None):
  674. before(*args, **kwargs)
  675. ret = func(*args, **kwargs)
  676. if(after is not None):
  677. after(*args, **kwargs)
  678. return ret
  679. return wrapper