TestActionQueue.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. from Queue import Queue
  18. from unittest import TestCase
  19. from ambari_agent.LiveStatus import LiveStatus
  20. from ambari_agent.ActionQueue import ActionQueue
  21. from ambari_agent.AmbariConfig import AmbariConfig
  22. import os, errno, time, pprint, tempfile, threading, json
  23. import StringIO
  24. import sys
  25. from threading import Thread
  26. import copy
  27. from mock.mock import patch, MagicMock, call
  28. from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
  29. from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
  30. from ambari_agent.PythonExecutor import PythonExecutor
  31. from ambari_agent.CommandStatusDict import CommandStatusDict
  32. from ambari_agent.ActualConfigHandler import ActualConfigHandler
  33. from ambari_agent.RecoveryManager import RecoveryManager
  34. from FileCache import FileCache
  35. from ambari_commons import OSCheck
  36. from only_for_platform import only_for_platform, get_platform, not_for_platform, PLATFORM_LINUX, PLATFORM_WINDOWS
  37. if get_platform() != PLATFORM_WINDOWS:
  38. os_distro_value = ('Suse','11','Final')
  39. else:
  40. os_distro_value = ('win2012serverr2','6.3','WindowsServer')
  41. class TestActionQueue(TestCase):
  42. def setUp(self):
  43. # save original open() method for later use
  44. self.original_open = open
  45. def tearDown(self):
  46. sys.stdout = sys.__stdout__
  47. datanode_install_command = {
  48. 'commandType': 'EXECUTION_COMMAND',
  49. 'role': u'DATANODE',
  50. 'roleCommand': u'INSTALL',
  51. 'commandId': '1-1',
  52. 'taskId': 3,
  53. 'clusterName': u'cc',
  54. 'serviceName': u'HDFS',
  55. 'hostLevelParams': {},
  56. 'configurations':{'global' : {}},
  57. 'configurationTags':{'global' : { 'tag': 'v1' }}
  58. }
  59. datanode_upgrade_command = {
  60. 'commandId': 17,
  61. 'role' : "role",
  62. 'taskId' : "taskId",
  63. 'clusterName' : "clusterName",
  64. 'serviceName' : "serviceName",
  65. 'roleCommand' : 'UPGRADE',
  66. 'hostname' : "localhost.localdomain",
  67. 'hostLevelParams': {},
  68. 'clusterHostInfo': "clusterHostInfo",
  69. 'commandType': "EXECUTION_COMMAND",
  70. 'configurations':{'global' : {}},
  71. 'roleParams': {},
  72. 'commandParams' : {
  73. 'source_stack_version' : 'HDP-1.2.1',
  74. 'target_stack_version' : 'HDP-1.3.0'
  75. }
  76. }
  77. namenode_install_command = {
  78. 'commandType': 'EXECUTION_COMMAND',
  79. 'role': u'NAMENODE',
  80. 'roleCommand': u'INSTALL',
  81. 'commandId': '1-1',
  82. 'taskId': 4,
  83. 'clusterName': u'cc',
  84. 'serviceName': u'HDFS',
  85. 'hostLevelParams': {}
  86. }
  87. snamenode_install_command = {
  88. 'commandType': 'EXECUTION_COMMAND',
  89. 'role': u'SECONDARY_NAMENODE',
  90. 'roleCommand': u'INSTALL',
  91. 'commandId': '1-1',
  92. 'taskId': 5,
  93. 'clusterName': u'cc',
  94. 'serviceName': u'HDFS',
  95. 'hostLevelParams': {}
  96. }
  97. hbase_install_command = {
  98. 'commandType': 'EXECUTION_COMMAND',
  99. 'role': u'HBASE',
  100. 'roleCommand': u'INSTALL',
  101. 'commandId': '1-1',
  102. 'taskId': 7,
  103. 'clusterName': u'cc',
  104. 'serviceName': u'HDFS',
  105. 'hostLevelParams': {}
  106. }
  107. status_command = {
  108. "serviceName" : 'HDFS',
  109. "commandType" : "STATUS_COMMAND",
  110. "clusterName" : "",
  111. "componentName" : "DATANODE",
  112. 'configurations':{},
  113. 'hostLevelParams': {}
  114. }
  115. datanode_restart_command = {
  116. 'commandType': 'EXECUTION_COMMAND',
  117. 'role': u'DATANODE',
  118. 'roleCommand': u'CUSTOM_COMMAND',
  119. 'commandId': '1-1',
  120. 'taskId': 9,
  121. 'clusterName': u'cc',
  122. 'serviceName': u'HDFS',
  123. 'configurations':{'global' : {}},
  124. 'configurationTags':{'global' : { 'tag': 'v123' }},
  125. 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
  126. }
  127. datanode_restart_command_no_clients_update = {
  128. 'commandType': 'EXECUTION_COMMAND',
  129. 'role': u'DATANODE',
  130. 'roleCommand': u'CUSTOM_COMMAND',
  131. 'commandId': '1-1',
  132. 'taskId': 9,
  133. 'clusterName': u'cc',
  134. 'serviceName': u'HDFS',
  135. 'configurations':{'global' : {}},
  136. 'configurationTags':{'global' : { 'tag': 'v123' }},
  137. 'hostLevelParams':{'custom_command': 'RESTART'}
  138. }
  139. status_command_for_alerts = {
  140. "serviceName" : 'FLUME',
  141. "commandType" : "STATUS_COMMAND",
  142. "clusterName" : "",
  143. "componentName" : "FLUME_HANDLER",
  144. 'configurations':{},
  145. 'hostLevelParams': {}
  146. }
  147. retryable_command = {
  148. 'commandType': 'EXECUTION_COMMAND',
  149. 'role': 'NAMENODE',
  150. 'roleCommand': 'INSTALL',
  151. 'commandId': '1-1',
  152. 'taskId': 19,
  153. 'clusterName': 'c1',
  154. 'serviceName': 'HDFS',
  155. 'configurations':{'global' : {}},
  156. 'configurationTags':{'global' : { 'tag': 'v123' }},
  157. 'commandParams' : {
  158. 'script_type' : 'PYTHON',
  159. 'script' : 'script.py',
  160. 'command_timeout' : '600',
  161. 'jdk_location' : '.',
  162. 'service_package_folder' : '.',
  163. 'command_retry_enabled' : 'true',
  164. 'command_retry_max_attempt_count' : '3'
  165. },
  166. 'hostLevelParams' : {}
  167. }
  168. background_command = {
  169. 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
  170. 'role': 'NAMENODE',
  171. 'roleCommand': 'CUSTOM_COMMAND',
  172. 'commandId': '1-1',
  173. 'taskId': 19,
  174. 'clusterName': 'c1',
  175. 'serviceName': 'HDFS',
  176. 'configurations':{'global' : {}},
  177. 'configurationTags':{'global' : { 'tag': 'v123' }},
  178. 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
  179. 'commandParams' : {
  180. 'script_type' : 'PYTHON',
  181. 'script' : 'script.py',
  182. 'command_timeout' : '600',
  183. 'jdk_location' : '.',
  184. 'service_package_folder' : '.'
  185. }
  186. }
  187. cancel_background_command = {
  188. 'commandType': 'EXECUTION_COMMAND',
  189. 'role': 'NAMENODE',
  190. 'roleCommand': 'ACTIONEXECUTE',
  191. 'commandId': '1-1',
  192. 'taskId': 20,
  193. 'clusterName': 'c1',
  194. 'serviceName': 'HDFS',
  195. 'configurations':{'global' : {}},
  196. 'configurationTags':{'global' : {}},
  197. 'hostLevelParams':{},
  198. 'commandParams' : {
  199. 'script_type' : 'PYTHON',
  200. 'script' : 'cancel_background_task.py',
  201. 'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
  202. 'jdk_location' : '.',
  203. 'command_timeout' : '600',
  204. 'service_package_folder' : '.',
  205. 'cancel_policy': 'SIGKILL',
  206. 'cancel_task_id': "19",
  207. }
  208. }
  209. @patch.object(AmbariConfig, "get_parallel_exec_option")
  210. @patch.object(ActionQueue, "process_command")
  211. @patch.object(Queue, "get")
  212. @patch.object(CustomServiceOrchestrator, "__init__")
  213. def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
  214. get_mock, process_command_mock, get_parallel_exec_option_mock):
  215. CustomServiceOrchestrator_mock.return_value = None
  216. dummy_controller = MagicMock()
  217. config = MagicMock()
  218. get_parallel_exec_option_mock.return_value = 0
  219. config.get_parallel_exec_option = get_parallel_exec_option_mock
  220. actionQueue = ActionQueue(config, dummy_controller)
  221. actionQueue.start()
  222. time.sleep(0.1)
  223. actionQueue.stop()
  224. actionQueue.join()
  225. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  226. self.assertTrue(process_command_mock.call_count > 1)
  227. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  228. @patch("traceback.print_exc")
  229. @patch.object(ActionQueue, "execute_command")
  230. @patch.object(ActionQueue, "execute_status_command")
  231. def test_process_command(self, execute_status_command_mock,
  232. execute_command_mock, print_exc_mock):
  233. dummy_controller = MagicMock()
  234. config = AmbariConfig()
  235. config.set('agent', 'tolerate_download_failures', "true")
  236. actionQueue = ActionQueue(config, dummy_controller)
  237. execution_command = {
  238. 'commandType' : ActionQueue.EXECUTION_COMMAND,
  239. }
  240. status_command = {
  241. 'commandType' : ActionQueue.STATUS_COMMAND,
  242. }
  243. wrong_command = {
  244. 'commandType' : "SOME_WRONG_COMMAND",
  245. }
  246. # Try wrong command
  247. actionQueue.process_command(wrong_command)
  248. self.assertFalse(execute_command_mock.called)
  249. self.assertFalse(execute_status_command_mock.called)
  250. self.assertFalse(print_exc_mock.called)
  251. execute_command_mock.reset_mock()
  252. execute_status_command_mock.reset_mock()
  253. print_exc_mock.reset_mock()
  254. # Try normal execution
  255. actionQueue.process_command(execution_command)
  256. self.assertTrue(execute_command_mock.called)
  257. self.assertFalse(execute_status_command_mock.called)
  258. self.assertFalse(print_exc_mock.called)
  259. execute_command_mock.reset_mock()
  260. execute_status_command_mock.reset_mock()
  261. print_exc_mock.reset_mock()
  262. actionQueue.process_command(status_command)
  263. self.assertFalse(execute_command_mock.called)
  264. self.assertTrue(execute_status_command_mock.called)
  265. self.assertFalse(print_exc_mock.called)
  266. execute_command_mock.reset_mock()
  267. execute_status_command_mock.reset_mock()
  268. print_exc_mock.reset_mock()
  269. # Try exception to check proper logging
  270. def side_effect(self):
  271. raise Exception("TerribleException")
  272. execute_command_mock.side_effect = side_effect
  273. actionQueue.process_command(execution_command)
  274. self.assertTrue(print_exc_mock.called)
  275. print_exc_mock.reset_mock()
  276. execute_status_command_mock.side_effect = side_effect
  277. actionQueue.process_command(execution_command)
  278. self.assertTrue(print_exc_mock.called)
  279. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  280. @patch("__builtin__.open")
  281. @patch.object(ActionQueue, "status_update_callback")
  282. def test_execute_command(self, status_update_callback_mock, open_mock):
  283. # Make file read calls visible
  284. def open_side_effect(file, mode):
  285. if mode == 'r':
  286. file_mock = MagicMock()
  287. file_mock.read.return_value = "Read from " + str(file)
  288. return file_mock
  289. else:
  290. return self.original_open(file, mode)
  291. open_mock.side_effect = open_side_effect
  292. config = AmbariConfig()
  293. tempdir = tempfile.gettempdir()
  294. config.set('agent', 'prefix', tempdir)
  295. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  296. config.set('agent', 'tolerate_download_failures', "true")
  297. dummy_controller = MagicMock()
  298. actionQueue = ActionQueue(config, dummy_controller)
  299. unfreeze_flag = threading.Event()
  300. python_execution_result_dict = {
  301. 'stdout': 'out',
  302. 'stderr': 'stderr',
  303. 'structuredOut' : ''
  304. }
  305. def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
  306. unfreeze_flag.wait()
  307. return python_execution_result_dict
  308. def patched_aq_execute_command(command):
  309. # We have to perform patching for separate thread in the same thread
  310. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  311. runCommand_mock.side_effect = side_effect
  312. actionQueue.execute_command(command)
  313. ### Test install/start/stop command ###
  314. ## Test successful execution with configuration tags
  315. python_execution_result_dict['status'] = 'COMPLETE'
  316. python_execution_result_dict['exitcode'] = 0
  317. # We call method in a separate thread
  318. execution_thread = Thread(target = patched_aq_execute_command ,
  319. args = (self.datanode_install_command, ))
  320. execution_thread.start()
  321. # check in progress report
  322. # wait until ready
  323. while True:
  324. time.sleep(0.1)
  325. report = actionQueue.result()
  326. if len(report['reports']) != 0:
  327. break
  328. expected = {'status': 'IN_PROGRESS',
  329. 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
  330. 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
  331. 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
  332. 'clusterName': u'cc',
  333. 'roleCommand': u'INSTALL',
  334. 'serviceName': u'HDFS',
  335. 'role': u'DATANODE',
  336. 'actionId': '1-1',
  337. 'taskId': 3,
  338. 'exitCode': 777}
  339. self.assertEqual(report['reports'][0], expected)
  340. # Continue command execution
  341. unfreeze_flag.set()
  342. # wait until ready
  343. while report['reports'][0]['status'] == 'IN_PROGRESS':
  344. time.sleep(0.1)
  345. report = actionQueue.result()
  346. # check report
  347. configname = os.path.join(tempdir, 'config.json')
  348. expected = {'status': 'COMPLETED',
  349. 'stderr': 'stderr',
  350. 'stdout': 'out',
  351. 'clusterName': u'cc',
  352. 'structuredOut': '""',
  353. 'roleCommand': u'INSTALL',
  354. 'serviceName': u'HDFS',
  355. 'role': u'DATANODE',
  356. 'actionId': '1-1',
  357. 'taskId': 3,
  358. 'configurationTags': {'global': {'tag': 'v1'}},
  359. 'exitCode': 0}
  360. self.assertEqual(len(report['reports']), 1)
  361. self.assertEqual(report['reports'][0], expected)
  362. self.assertTrue(os.path.isfile(configname))
  363. # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
  364. self.assertEqual(status_update_callback_mock.call_count, 2)
  365. os.remove(configname)
  366. # now should not have reports (read complete/failed reports are deleted)
  367. report = actionQueue.result()
  368. self.assertEqual(len(report['reports']), 0)
  369. ## Test failed execution
  370. python_execution_result_dict['status'] = 'FAILED'
  371. python_execution_result_dict['exitcode'] = 13
  372. # We call method in a separate thread
  373. execution_thread = Thread(target = patched_aq_execute_command ,
  374. args = (self.datanode_install_command, ))
  375. execution_thread.start()
  376. unfreeze_flag.set()
  377. # check in progress report
  378. # wait until ready
  379. report = actionQueue.result()
  380. while len(report['reports']) == 0 or \
  381. report['reports'][0]['status'] == 'IN_PROGRESS':
  382. time.sleep(0.1)
  383. report = actionQueue.result()
  384. # check report
  385. expected = {'status': 'FAILED',
  386. 'stderr': 'stderr',
  387. 'stdout': 'out',
  388. 'clusterName': u'cc',
  389. 'structuredOut': '""',
  390. 'roleCommand': u'INSTALL',
  391. 'serviceName': u'HDFS',
  392. 'role': u'DATANODE',
  393. 'actionId': '1-1',
  394. 'taskId': 3,
  395. 'exitCode': 13}
  396. self.assertEqual(len(report['reports']), 1)
  397. self.assertEqual(report['reports'][0], expected)
  398. # now should not have reports (read complete/failed reports are deleted)
  399. report = actionQueue.result()
  400. self.assertEqual(len(report['reports']), 0)
  401. ### Test upgrade command ###
  402. python_execution_result_dict['status'] = 'COMPLETE'
  403. python_execution_result_dict['exitcode'] = 0
  404. execution_thread = Thread(target = patched_aq_execute_command ,
  405. args = (self.datanode_upgrade_command, ))
  406. execution_thread.start()
  407. unfreeze_flag.set()
  408. # wait until ready
  409. report = actionQueue.result()
  410. while len(report['reports']) == 0 or \
  411. report['reports'][0]['status'] == 'IN_PROGRESS':
  412. time.sleep(0.1)
  413. report = actionQueue.result()
  414. # check report
  415. expected = {'status': 'COMPLETED',
  416. 'stderr': 'stderr',
  417. 'stdout': 'out',
  418. 'clusterName': 'clusterName',
  419. 'structuredOut': '""',
  420. 'roleCommand': 'UPGRADE',
  421. 'serviceName': 'serviceName',
  422. 'role': 'role',
  423. 'actionId': 17,
  424. 'taskId': 'taskId',
  425. 'exitCode': 0}
  426. self.assertEqual(len(report['reports']), 1)
  427. self.assertEqual(report['reports'][0], expected)
  428. # now should not have reports (read complete/failed reports are deleted)
  429. report = actionQueue.result()
  430. self.assertEqual(len(report['reports']), 0)
  431. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  432. @patch.object(CustomServiceOrchestrator, "runCommand")
  433. @patch("CommandStatusDict.CommandStatusDict")
  434. @patch.object(ActionQueue, "status_update_callback")
  435. def test_store_configuration_tags(self, status_update_callback_mock,
  436. command_status_dict_mock,
  437. cso_runCommand_mock):
  438. custom_service_orchestrator_execution_result_dict = {
  439. 'stdout': 'out',
  440. 'stderr': 'stderr',
  441. 'structuredOut' : '',
  442. 'exitcode' : 0
  443. }
  444. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  445. config = AmbariConfig()
  446. tempdir = tempfile.gettempdir()
  447. config.set('agent', 'prefix', tempdir)
  448. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  449. config.set('agent', 'tolerate_download_failures', "true")
  450. dummy_controller = MagicMock()
  451. actionQueue = ActionQueue(config, dummy_controller)
  452. actionQueue.execute_command(self.datanode_restart_command)
  453. report = actionQueue.result()
  454. expected = {'status': 'COMPLETED',
  455. 'configurationTags': {'global': {'tag': 'v123'}},
  456. 'stderr': 'stderr',
  457. 'stdout': 'out',
  458. 'clusterName': u'cc',
  459. 'structuredOut': '""',
  460. 'roleCommand': u'CUSTOM_COMMAND',
  461. 'serviceName': u'HDFS',
  462. 'role': u'DATANODE',
  463. 'actionId': '1-1',
  464. 'taskId': 9,
  465. 'customCommand': 'RESTART',
  466. 'exitCode': 0}
  467. # Agent caches configurationTags if custom_command RESTART completed
  468. self.assertEqual(len(report['reports']), 1)
  469. self.assertEqual(expected, report['reports'][0])
  470. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  471. @patch.object(ActualConfigHandler, "write_client_components")
  472. @patch.object(CustomServiceOrchestrator, "runCommand")
  473. @patch("CommandStatusDict.CommandStatusDict")
  474. @patch.object(ActionQueue, "status_update_callback")
  475. def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
  476. command_status_dict_mock,
  477. cso_runCommand_mock, write_client_components_mock):
  478. custom_service_orchestrator_execution_result_dict = {
  479. 'stdout': 'out',
  480. 'stderr': 'stderr',
  481. 'structuredOut' : '',
  482. 'exitcode' : 0
  483. }
  484. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  485. config = AmbariConfig()
  486. tempdir = tempfile.gettempdir()
  487. config.set('agent', 'prefix', tempdir)
  488. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  489. config.set('agent', 'tolerate_download_failures', "true")
  490. dummy_controller = MagicMock()
  491. actionQueue = ActionQueue(config, dummy_controller)
  492. actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
  493. report = actionQueue.result()
  494. expected = {'status': 'COMPLETED',
  495. 'configurationTags': {'global': {'tag': 'v123'}},
  496. 'stderr': 'stderr',
  497. 'stdout': 'out',
  498. 'clusterName': u'cc',
  499. 'structuredOut': '""',
  500. 'roleCommand': u'CUSTOM_COMMAND',
  501. 'serviceName': u'HDFS',
  502. 'role': u'DATANODE',
  503. 'actionId': '1-1',
  504. 'taskId': 9,
  505. 'customCommand': 'RESTART',
  506. 'exitCode': 0}
  507. # Agent caches configurationTags if custom_command RESTART completed
  508. self.assertEqual(len(report['reports']), 1)
  509. self.assertEqual(expected, report['reports'][0])
  510. self.assertFalse(write_client_components_mock.called)
  511. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  512. @patch.object(ActionQueue, "status_update_callback")
  513. @patch.object(StackVersionsFileHandler, "read_stack_version")
  514. @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
  515. @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
  516. @patch.object(ActionQueue, "execute_command")
  517. @patch.object(LiveStatus, "build")
  518. @patch.object(CustomServiceOrchestrator, "__init__")
  519. def test_execute_status_command(self, CustomServiceOrchestrator_mock,
  520. build_mock, execute_command_mock, requestComponentSecurityState_mock,
  521. requestComponentStatus_mock, read_stack_version_mock,
  522. status_update_callback):
  523. CustomServiceOrchestrator_mock.return_value = None
  524. dummy_controller = MagicMock()
  525. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  526. build_mock.return_value = {'dummy report': '' }
  527. dummy_controller.recovery_manager = RecoveryManager()
  528. requestComponentStatus_mock.reset_mock()
  529. requestComponentStatus_mock.return_value = {'exitcode': 0 }
  530. requestComponentSecurityState_mock.reset_mock()
  531. requestComponentSecurityState_mock.return_value = 'UNKNOWN'
  532. actionQueue.execute_status_command(self.status_command)
  533. report = actionQueue.result()
  534. expected = {'dummy report': '',
  535. 'securityState' : 'UNKNOWN'}
  536. self.assertEqual(len(report['componentStatus']), 1)
  537. self.assertEqual(report['componentStatus'][0], expected)
  538. self.assertTrue(requestComponentStatus_mock.called)
  539. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  540. @patch.object(ActionQueue, "status_update_callback")
  541. @patch.object(StackVersionsFileHandler, "read_stack_version")
  542. @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
  543. @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
  544. @patch.object(ActionQueue, "execute_command")
  545. @patch.object(LiveStatus, "build")
  546. @patch.object(CustomServiceOrchestrator, "__init__")
  547. def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
  548. requestComponentSecurityState_mock,
  549. build_mock, execute_command_mock,
  550. requestComponentStatus_mock, read_stack_version_mock,
  551. status_update_callback):
  552. CustomServiceOrchestrator_mock.return_value = None
  553. dummy_controller = MagicMock()
  554. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  555. requestComponentStatus_mock.reset_mock()
  556. requestComponentStatus_mock.return_value = {
  557. 'exitcode': 0,
  558. 'stdout': 'out',
  559. 'stderr': 'err',
  560. 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
  561. }
  562. build_mock.return_value = {'somestatusresult': 'aresult'}
  563. actionQueue.execute_status_command(self.status_command_for_alerts)
  564. report = actionQueue.result()
  565. self.assertTrue(requestComponentStatus_mock.called)
  566. self.assertEqual(len(report['componentStatus']), 1)
  567. self.assertTrue(report['componentStatus'][0].has_key('alerts'))
  568. @patch.object(AmbariConfig, "get_parallel_exec_option")
  569. @patch.object(ActionQueue, "process_command")
  570. @patch.object(Queue, "get")
  571. @patch.object(CustomServiceOrchestrator, "__init__")
  572. def test_reset_queue(self, CustomServiceOrchestrator_mock,
  573. get_mock, process_command_mock, gpeo_mock):
  574. CustomServiceOrchestrator_mock.return_value = None
  575. dummy_controller = MagicMock()
  576. config = MagicMock()
  577. gpeo_mock.return_value = 0
  578. config.get_parallel_exec_option = gpeo_mock
  579. actionQueue = ActionQueue(config, dummy_controller)
  580. actionQueue.start()
  581. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  582. self.assertEqual(2, actionQueue.commandQueue.qsize())
  583. actionQueue.reset()
  584. self.assertTrue(actionQueue.commandQueue.empty())
  585. time.sleep(0.1)
  586. actionQueue.stop()
  587. actionQueue.join()
  588. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  589. @patch.object(AmbariConfig, "get_parallel_exec_option")
  590. @patch.object(ActionQueue, "process_command")
  591. @patch.object(Queue, "get")
  592. @patch.object(CustomServiceOrchestrator, "__init__")
  593. def test_cancel(self, CustomServiceOrchestrator_mock,
  594. get_mock, process_command_mock, gpeo_mock):
  595. CustomServiceOrchestrator_mock.return_value = None
  596. dummy_controller = MagicMock()
  597. config = MagicMock()
  598. gpeo_mock.return_value = 0
  599. config.get_parallel_exec_option = gpeo_mock
  600. actionQueue = ActionQueue(config, dummy_controller)
  601. actionQueue.start()
  602. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  603. self.assertEqual(2, actionQueue.commandQueue.qsize())
  604. actionQueue.reset()
  605. self.assertTrue(actionQueue.commandQueue.empty())
  606. time.sleep(0.1)
  607. actionQueue.stop()
  608. actionQueue.join()
  609. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  610. @patch.object(AmbariConfig, "get_parallel_exec_option")
  611. @patch.object(ActionQueue, "process_command")
  612. @patch.object(CustomServiceOrchestrator, "__init__")
  613. def test_parallel_exec(self, CustomServiceOrchestrator_mock,
  614. process_command_mock, gpeo_mock):
  615. CustomServiceOrchestrator_mock.return_value = None
  616. dummy_controller = MagicMock()
  617. config = MagicMock()
  618. gpeo_mock.return_value = 1
  619. config.get_parallel_exec_option = gpeo_mock
  620. actionQueue = ActionQueue(config, dummy_controller)
  621. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  622. self.assertEqual(2, actionQueue.commandQueue.qsize())
  623. actionQueue.start()
  624. time.sleep(1)
  625. actionQueue.stop()
  626. actionQueue.join()
  627. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  628. self.assertEqual(2, process_command_mock.call_count)
  629. process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
  630. @patch("time.sleep")
  631. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  632. @patch.object(StackVersionsFileHandler, "read_stack_version")
  633. @patch.object(CustomServiceOrchestrator, "__init__")
  634. def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
  635. read_stack_version_mock, sleep_mock
  636. ):
  637. CustomServiceOrchestrator_mock.return_value = None
  638. dummy_controller = MagicMock()
  639. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  640. python_execution_result_dict = {
  641. 'exitcode': 1,
  642. 'stdout': 'out',
  643. 'stderr': 'stderr',
  644. 'structuredOut': '',
  645. 'status': 'FAILED'
  646. }
  647. def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
  648. return python_execution_result_dict
  649. command = copy.deepcopy(self.retryable_command)
  650. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  651. runCommand_mock.side_effect = side_effect
  652. actionQueue.execute_command(command)
  653. #assert that python executor start
  654. self.assertTrue(runCommand_mock.called)
  655. self.assertEqual(3, runCommand_mock.call_count)
  656. self.assertEqual(2, sleep_mock.call_count)
  657. sleep_mock.assert_has_calls([call(2), call(4)], False)
  658. runCommand_mock.assert_has_calls([
  659. call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=True, retry=False),
  660. call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=False, retry=True),
  661. call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=False, retry=True)])
  662. #retryable_command
  663. @patch("time.sleep")
  664. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  665. @patch.object(StackVersionsFileHandler, "read_stack_version")
  666. @patch.object(CustomServiceOrchestrator, "__init__")
  667. def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
  668. read_stack_version_mock, sleep_mock
  669. ):
  670. CustomServiceOrchestrator_mock.return_value = None
  671. dummy_controller = MagicMock()
  672. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  673. execution_result_fail_dict = {
  674. 'exitcode': 1,
  675. 'stdout': 'out',
  676. 'stderr': 'stderr',
  677. 'structuredOut': '',
  678. 'status': 'FAILED'
  679. }
  680. execution_result_succ_dict = {
  681. 'exitcode': 0,
  682. 'stdout': 'out',
  683. 'stderr': 'stderr',
  684. 'structuredOut': '',
  685. 'status': 'COMPLETED'
  686. }
  687. command = copy.deepcopy(self.retryable_command)
  688. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  689. runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
  690. actionQueue.execute_command(command)
  691. #assert that python executor start
  692. self.assertTrue(runCommand_mock.called)
  693. self.assertEqual(2, runCommand_mock.call_count)
  694. self.assertEqual(1, sleep_mock.call_count)
  695. sleep_mock.assert_any_call(2)
  696. @patch("time.sleep")
  697. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  698. @patch.object(StackVersionsFileHandler, "read_stack_version")
  699. @patch.object(CustomServiceOrchestrator, "__init__")
  700. def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock,
  701. read_stack_version_mock, sleep_mock
  702. ):
  703. CustomServiceOrchestrator_mock.return_value = None
  704. dummy_controller = MagicMock()
  705. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  706. execution_result_succ_dict = {
  707. 'exitcode': 0,
  708. 'stdout': 'out',
  709. 'stderr': 'stderr',
  710. 'structuredOut': '',
  711. 'status': 'COMPLETED'
  712. }
  713. command = copy.deepcopy(self.retryable_command)
  714. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  715. runCommand_mock.side_effect = [execution_result_succ_dict]
  716. actionQueue.execute_command(command)
  717. #assert that python executor start
  718. self.assertTrue(runCommand_mock.called)
  719. self.assertFalse(sleep_mock.called)
  720. self.assertEqual(1, runCommand_mock.call_count)
  721. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  722. @patch.object(StackVersionsFileHandler, "read_stack_version")
  723. @patch.object(CustomServiceOrchestrator, "runCommand")
  724. @patch.object(CustomServiceOrchestrator, "__init__")
  725. def test_execute_background_command(self, CustomServiceOrchestrator_mock,
  726. runCommand_mock, read_stack_version_mock
  727. ):
  728. CustomServiceOrchestrator_mock.return_value = None
  729. CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
  730. 'stdout': 'out-11',
  731. 'stderr' : 'err-13'}
  732. dummy_controller = MagicMock()
  733. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  734. execute_command = copy.deepcopy(self.background_command)
  735. actionQueue.put([execute_command])
  736. actionQueue.processBackgroundQueueSafeEmpty();
  737. actionQueue.processStatusCommandQueueSafeEmpty();
  738. #assert that python execturor start
  739. self.assertTrue(runCommand_mock.called)
  740. runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
  741. self.assertTrue(runningCommand is not None)
  742. self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
  743. report = actionQueue.result()
  744. self.assertEqual(len(report['reports']),1)
  745. @not_for_platform(PLATFORM_WINDOWS)
  746. @patch.object(CustomServiceOrchestrator, "get_py_executor")
  747. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  748. @patch.object(StackVersionsFileHandler, "read_stack_version")
  749. def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock,
  750. get_py_executor_mock):
  751. dummy_controller = MagicMock()
  752. cfg = AmbariConfig()
  753. cfg.set('agent', 'tolerate_download_failures', 'true')
  754. cfg.set('agent', 'prefix', '.')
  755. cfg.set('agent', 'cache_dir', 'background_tasks')
  756. actionQueue = ActionQueue(cfg, dummy_controller)
  757. pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
  758. patch_output_file(pyex)
  759. get_py_executor_mock.return_value = pyex
  760. actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
  761. result = {}
  762. lock = threading.RLock()
  763. complete_done = threading.Condition(lock)
  764. def command_complete_w(process_condensed_result, handle):
  765. with lock:
  766. result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
  767. 'handle' : copy.copy(handle),
  768. 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
  769. }
  770. complete_done.notifyAll()
  771. actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
  772. None, command_complete_w)
  773. actionQueue.put([self.background_command])
  774. actionQueue.processBackgroundQueueSafeEmpty();
  775. actionQueue.processStatusCommandQueueSafeEmpty();
  776. with lock:
  777. complete_done.wait(0.1)
  778. finished_status = result['command_complete']['command_status']
  779. self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
  780. self.assertEqual(finished_status['stdout'], 'process_out')
  781. self.assertEqual(finished_status['stderr'], 'process_err')
  782. self.assertEqual(finished_status['exitCode'], 0)
  783. runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
  784. self.assertTrue(runningCommand is not None)
  785. report = actionQueue.result()
  786. self.assertEqual(len(report['reports']),1)
  787. self.assertEqual(report['reports'][0]['stdout'],'process_out')
  788. # self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
  789. cancel_background_command = {
  790. "commandType":"CANCEL_COMMAND",
  791. "role":"AMBARI_SERVER_ACTION",
  792. "roleCommand":"ABORT",
  793. "commandId":"2--1",
  794. "taskId":20,
  795. "clusterName":"c1",
  796. "serviceName":"",
  797. "hostname":"c6401",
  798. "roleParams":{
  799. "cancelTaskIdTargets":"13,14"
  800. },
  801. }
  802. def patch_output_file(pythonExecutor):
  803. def windows_py(command, tmpout, tmperr):
  804. proc = MagicMock()
  805. proc.pid = 33
  806. proc.returncode = 0
  807. with tmpout:
  808. tmpout.write('process_out')
  809. with tmperr:
  810. tmperr.write('process_err')
  811. return proc
  812. def open_subprocess_files_win(fout, ferr, f):
  813. return MagicMock(), MagicMock()
  814. def read_result_from_files(out_path, err_path, structured_out_path):
  815. return 'process_out', 'process_err', '{"a": "b."}'
  816. pythonExecutor.launch_python_subprocess = windows_py
  817. pythonExecutor.open_subprocess_files = open_subprocess_files_win
  818. pythonExecutor.read_result_from_files = read_result_from_files
  819. def wraped(func, before = None, after = None):
  820. def wrapper(*args, **kwargs):
  821. if(before is not None):
  822. before(*args, **kwargs)
  823. ret = func(*args, **kwargs)
  824. if(after is not None):
  825. after(*args, **kwargs)
  826. return ret
  827. return wrapper