TestActionQueue.py 56 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. from Queue import Queue
  18. from unittest import TestCase
  19. from ambari_agent.LiveStatus import LiveStatus
  20. from ambari_agent.ActionQueue import ActionQueue
  21. from ambari_agent.AmbariConfig import AmbariConfig
  22. import os, errno, time, pprint, tempfile, threading
  23. import sys
  24. from threading import Thread
  25. import copy
  26. import signal
  27. from mock.mock import patch, MagicMock, call
  28. from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
  29. from ambari_agent.PythonExecutor import PythonExecutor
  30. from ambari_agent.ActualConfigHandler import ActualConfigHandler
  31. from ambari_agent.RecoveryManager import RecoveryManager
  32. from ambari_commons import OSCheck
  33. from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX
  34. import logging
  35. class TestActionQueue(TestCase):
  36. def setUp(self):
  37. # save original open() method for later use
  38. self.original_open = open
  39. def tearDown(self):
  40. sys.stdout = sys.__stdout__
  41. logger = logging.getLogger()
  42. datanode_install_command = {
  43. 'commandType': 'EXECUTION_COMMAND',
  44. 'role': u'DATANODE',
  45. 'roleCommand': u'INSTALL',
  46. 'commandId': '1-1',
  47. 'taskId': 3,
  48. 'clusterName': u'cc',
  49. 'serviceName': u'HDFS',
  50. 'hostLevelParams': {},
  51. 'configurations':{'global' : {}},
  52. 'configurationTags':{'global' : { 'tag': 'v1' }},
  53. 'commandParams': {
  54. 'command_retry_enabled': 'true'
  55. }
  56. }
  57. datanode_install_no_retry_command = {
  58. 'commandType': 'EXECUTION_COMMAND',
  59. 'role': u'DATANODE',
  60. 'roleCommand': u'INSTALL',
  61. 'commandId': '1-1',
  62. 'taskId': 3,
  63. 'clusterName': u'cc',
  64. 'serviceName': u'HDFS',
  65. 'hostLevelParams': {},
  66. 'configurations':{'global' : {}},
  67. 'configurationTags':{'global' : { 'tag': 'v1' }},
  68. 'commandParams': {
  69. 'command_retry_enabled': 'false'
  70. }
  71. }
  72. datanode_auto_start_command = {
  73. 'commandType': 'AUTO_EXECUTION_COMMAND',
  74. 'role': u'DATANODE',
  75. 'roleCommand': u'START',
  76. 'commandId': '1-1',
  77. 'taskId': 3,
  78. 'clusterName': u'cc',
  79. 'serviceName': u'HDFS',
  80. 'hostLevelParams': {},
  81. 'configurations':{'global' : {}},
  82. 'configurationTags':{'global' : { 'tag': 'v1' }}
  83. }
  84. datanode_upgrade_command = {
  85. 'commandId': 17,
  86. 'role' : "role",
  87. 'taskId' : "taskId",
  88. 'clusterName' : "clusterName",
  89. 'serviceName' : "serviceName",
  90. 'roleCommand' : 'UPGRADE',
  91. 'hostname' : "localhost.localdomain",
  92. 'hostLevelParams': {},
  93. 'clusterHostInfo': "clusterHostInfo",
  94. 'commandType': "EXECUTION_COMMAND",
  95. 'configurations':{'global' : {}},
  96. 'roleParams': {},
  97. 'commandParams' : {
  98. 'source_stack_version' : 'HDP-1.2.1',
  99. 'target_stack_version' : 'HDP-1.3.0'
  100. }
  101. }
  102. namenode_install_command = {
  103. 'commandType': 'EXECUTION_COMMAND',
  104. 'role': u'NAMENODE',
  105. 'roleCommand': u'INSTALL',
  106. 'commandId': '1-1',
  107. 'taskId': 4,
  108. 'clusterName': u'cc',
  109. 'serviceName': u'HDFS',
  110. 'hostLevelParams': {}
  111. }
  112. snamenode_install_command = {
  113. 'commandType': 'EXECUTION_COMMAND',
  114. 'role': u'SECONDARY_NAMENODE',
  115. 'roleCommand': u'INSTALL',
  116. 'commandId': '1-1',
  117. 'taskId': 5,
  118. 'clusterName': u'cc',
  119. 'serviceName': u'HDFS',
  120. 'hostLevelParams': {}
  121. }
  122. hbase_install_command = {
  123. 'commandType': 'EXECUTION_COMMAND',
  124. 'role': u'HBASE',
  125. 'roleCommand': u'INSTALL',
  126. 'commandId': '1-1',
  127. 'taskId': 7,
  128. 'clusterName': u'cc',
  129. 'serviceName': u'HDFS',
  130. 'hostLevelParams': {},
  131. 'commandParams': {
  132. 'command_retry_enabled': 'true'
  133. }
  134. }
  135. status_command = {
  136. "serviceName" : 'HDFS',
  137. "commandType" : "STATUS_COMMAND",
  138. "clusterName" : "",
  139. "componentName" : "DATANODE",
  140. 'configurations':{},
  141. 'hostLevelParams': {}
  142. }
  143. datanode_restart_command = {
  144. 'commandType': 'EXECUTION_COMMAND',
  145. 'role': u'DATANODE',
  146. 'roleCommand': u'CUSTOM_COMMAND',
  147. 'commandId': '1-1',
  148. 'taskId': 9,
  149. 'clusterName': u'cc',
  150. 'serviceName': u'HDFS',
  151. 'configurations':{'global' : {}},
  152. 'configurationTags':{'global' : { 'tag': 'v123' }},
  153. 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
  154. }
  155. datanode_restart_command_no_logging = {
  156. 'commandType': 'EXECUTION_COMMAND',
  157. 'role': u'DATANODE',
  158. 'roleCommand': u'CUSTOM_COMMAND',
  159. 'commandId': '1-1',
  160. 'taskId': 9,
  161. 'clusterName': u'cc',
  162. 'serviceName': u'HDFS',
  163. 'configurations': {'global': {}},
  164. 'configurationTags': {'global': {'tag': 'v123'}},
  165. 'commandParams': {
  166. 'log_output': 'false'
  167. },
  168. 'hostLevelParams': {'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
  169. }
  170. datanode_restart_command_no_clients_update = {
  171. 'commandType': 'EXECUTION_COMMAND',
  172. 'role': u'DATANODE',
  173. 'roleCommand': u'CUSTOM_COMMAND',
  174. 'commandId': '1-1',
  175. 'taskId': 9,
  176. 'clusterName': u'cc',
  177. 'serviceName': u'HDFS',
  178. 'configurations':{'global' : {}},
  179. 'configurationTags':{'global' : { 'tag': 'v123' }},
  180. 'hostLevelParams':{'custom_command': 'RESTART'}
  181. }
  182. datanode_start_custom_command = {
  183. 'commandType': 'EXECUTION_COMMAND',
  184. 'role': u'DATANODE',
  185. 'roleCommand': u'CUSTOM_COMMAND',
  186. 'commandId': '1-1',
  187. 'taskId': 9,
  188. 'clusterName': u'cc',
  189. 'serviceName': u'HDFS',
  190. 'configurations':{'global' : {}},
  191. 'configurationTags':{'global' : { 'tag': 'v123' }},
  192. 'hostLevelParams':{'custom_command': 'START'}
  193. }
  194. yarn_refresh_queues_custom_command = {
  195. 'commandType': 'EXECUTION_COMMAND',
  196. 'role': u'RESOURCEMANAGER',
  197. 'roleCommand': u'CUSTOM_COMMAND',
  198. 'commandId': '1-1',
  199. 'taskId': 9,
  200. 'clusterName': u'cc',
  201. 'serviceName': u'YARN',
  202. 'commandParams' : {'forceRefreshConfigTags' : 'capacity-scheduler'},
  203. 'configurations':{'global' : {}},
  204. 'configurationTags':{'global' : { 'tag': 'v123' }, 'capacity-scheduler' : {'tag': 'v123'}},
  205. 'hostLevelParams':{'custom_command': 'REFRESHQUEUES'}
  206. }
  207. status_command_for_alerts = {
  208. "serviceName" : 'FLUME',
  209. "commandType" : "STATUS_COMMAND",
  210. "clusterName" : "",
  211. "componentName" : "FLUME_HANDLER",
  212. 'configurations':{},
  213. 'hostLevelParams': {}
  214. }
  215. retryable_command = {
  216. 'commandType': 'EXECUTION_COMMAND',
  217. 'role': 'NAMENODE',
  218. 'roleCommand': 'INSTALL',
  219. 'commandId': '1-1',
  220. 'taskId': 19,
  221. 'clusterName': 'c1',
  222. 'serviceName': 'HDFS',
  223. 'configurations':{'global' : {}},
  224. 'configurationTags':{'global' : { 'tag': 'v123' }},
  225. 'commandParams' : {
  226. 'script_type' : 'PYTHON',
  227. 'script' : 'script.py',
  228. 'command_timeout' : '600',
  229. 'jdk_location' : '.',
  230. 'service_package_folder' : '.',
  231. 'command_retry_enabled' : 'true',
  232. 'max_duration_for_retries' : '5'
  233. },
  234. 'hostLevelParams' : {}
  235. }
  236. background_command = {
  237. 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
  238. 'role': 'NAMENODE',
  239. 'roleCommand': 'CUSTOM_COMMAND',
  240. 'commandId': '1-1',
  241. 'taskId': 19,
  242. 'clusterName': 'c1',
  243. 'serviceName': 'HDFS',
  244. 'configurations':{'global' : {}},
  245. 'configurationTags':{'global' : { 'tag': 'v123' }},
  246. 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
  247. 'commandParams' : {
  248. 'script_type' : 'PYTHON',
  249. 'script' : 'script.py',
  250. 'command_timeout' : '600',
  251. 'jdk_location' : '.',
  252. 'service_package_folder' : '.'
  253. }
  254. }
  255. cancel_background_command = {
  256. 'commandType': 'EXECUTION_COMMAND',
  257. 'role': 'NAMENODE',
  258. 'roleCommand': 'ACTIONEXECUTE',
  259. 'commandId': '1-1',
  260. 'taskId': 20,
  261. 'clusterName': 'c1',
  262. 'serviceName': 'HDFS',
  263. 'configurations':{'global' : {}},
  264. 'configurationTags':{'global' : {}},
  265. 'hostLevelParams':{},
  266. 'commandParams' : {
  267. 'script_type' : 'PYTHON',
  268. 'script' : 'cancel_background_task.py',
  269. 'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
  270. 'jdk_location' : '.',
  271. 'command_timeout' : '600',
  272. 'service_package_folder' : '.',
  273. 'cancel_policy': 'SIGKILL',
  274. 'cancel_task_id': "19",
  275. }
  276. }
  277. @patch.object(AmbariConfig, "get_parallel_exec_option")
  278. @patch.object(ActionQueue, "process_command")
  279. @patch.object(Queue, "get")
  280. @patch.object(CustomServiceOrchestrator, "__init__")
  281. def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
  282. get_mock, process_command_mock, get_parallel_exec_option_mock):
  283. CustomServiceOrchestrator_mock.return_value = None
  284. dummy_controller = MagicMock()
  285. config = MagicMock()
  286. get_parallel_exec_option_mock.return_value = 0
  287. config.get_parallel_exec_option = get_parallel_exec_option_mock
  288. actionQueue = ActionQueue(config, dummy_controller)
  289. actionQueue.start()
  290. time.sleep(0.1)
  291. actionQueue.stop()
  292. actionQueue.join()
  293. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  294. self.assertTrue(process_command_mock.call_count > 1)
  295. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  296. @patch("logging.RootLogger.exception")
  297. @patch.object(ActionQueue, "execute_command")
  298. def test_process_command(self, execute_command_mock, log_exc_mock):
  299. dummy_controller = MagicMock()
  300. config = AmbariConfig()
  301. config.set('agent', 'tolerate_download_failures', "true")
  302. actionQueue = ActionQueue(config, dummy_controller)
  303. execution_command = {
  304. 'commandType' : ActionQueue.EXECUTION_COMMAND,
  305. }
  306. status_command = {
  307. 'commandType' : ActionQueue.STATUS_COMMAND,
  308. }
  309. wrong_command = {
  310. 'commandType' : "SOME_WRONG_COMMAND",
  311. }
  312. # Try wrong command
  313. actionQueue.process_command(wrong_command)
  314. self.assertFalse(execute_command_mock.called)
  315. self.assertFalse(log_exc_mock.called)
  316. execute_command_mock.reset_mock()
  317. log_exc_mock.reset_mock()
  318. # Try normal execution
  319. actionQueue.process_command(execution_command)
  320. self.assertTrue(execute_command_mock.called)
  321. self.assertFalse(log_exc_mock.called)
  322. execute_command_mock.reset_mock()
  323. log_exc_mock.reset_mock()
  324. execute_command_mock.reset_mock()
  325. log_exc_mock.reset_mock()
  326. # Try exception to check proper logging
  327. def side_effect(self):
  328. raise Exception("TerribleException")
  329. execute_command_mock.side_effect = side_effect
  330. actionQueue.process_command(execution_command)
  331. self.assertTrue(log_exc_mock.called)
  332. log_exc_mock.reset_mock()
  333. actionQueue.process_command(execution_command)
  334. self.assertTrue(log_exc_mock.called)
  335. @patch.object(ActionQueue, "log_command_output")
  336. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  337. @patch.object(CustomServiceOrchestrator, "runCommand")
  338. @patch("CommandStatusDict.CommandStatusDict")
  339. @patch.object(ActionQueue, "status_update_callback")
  340. def test_log_execution_commands(self, status_update_callback_mock,
  341. command_status_dict_mock,
  342. cso_runCommand_mock, mock_log_command_output):
  343. custom_service_orchestrator_execution_result_dict = {
  344. 'stdout': 'out',
  345. 'stderr': 'stderr',
  346. 'structuredOut' : '',
  347. 'exitcode' : 0
  348. }
  349. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  350. config = AmbariConfig()
  351. tempdir = tempfile.gettempdir()
  352. config.set('agent', 'prefix', tempdir)
  353. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  354. config.set('agent', 'tolerate_download_failures', "true")
  355. config.set('logging', 'log_command_executes', 1)
  356. dummy_controller = MagicMock()
  357. actionQueue = ActionQueue(config, dummy_controller)
  358. actionQueue.execute_command(self.datanode_restart_command)
  359. report = actionQueue.result()
  360. expected = {'status': 'COMPLETED',
  361. 'configurationTags': {'global': {'tag': 'v123'}},
  362. 'stderr': 'stderr',
  363. 'stdout': 'out\n\nCommand completed successfully!\n',
  364. 'clusterName': u'cc',
  365. 'structuredOut': '""',
  366. 'roleCommand': u'CUSTOM_COMMAND',
  367. 'serviceName': u'HDFS',
  368. 'role': u'DATANODE',
  369. 'actionId': '1-1',
  370. 'taskId': 9,
  371. 'customCommand': 'RESTART',
  372. 'exitCode': 0}
  373. # Agent caches configurationTags if custom_command RESTART completed
  374. mock_log_command_output.assert_has_calls([call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
  375. self.assertEqual(len(report['reports']), 1)
  376. self.assertEqual(expected, report['reports'][0])
  377. @patch.object(ActionQueue, "log_command_output")
  378. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  379. @patch.object(CustomServiceOrchestrator, "runCommand")
  380. @patch("CommandStatusDict.CommandStatusDict")
  381. @patch.object(ActionQueue, "status_update_callback")
  382. def test_do_not_log_execution_commands(self, status_update_callback_mock,
  383. command_status_dict_mock,
  384. cso_runCommand_mock, mock_log_command_output):
  385. custom_service_orchestrator_execution_result_dict = {
  386. 'stdout': 'out',
  387. 'stderr': 'stderr',
  388. 'structuredOut': '',
  389. 'exitcode': 0
  390. }
  391. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  392. config = AmbariConfig()
  393. tempdir = tempfile.gettempdir()
  394. config.set('agent', 'prefix', tempdir)
  395. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  396. config.set('agent', 'tolerate_download_failures', "true")
  397. config.set('logging', 'log_command_executes', 1)
  398. dummy_controller = MagicMock()
  399. actionQueue = ActionQueue(config, dummy_controller)
  400. actionQueue.execute_command(self.datanode_restart_command_no_logging)
  401. report = actionQueue.result()
  402. expected = {'status': 'COMPLETED',
  403. 'configurationTags': {'global': {'tag': 'v123'}},
  404. 'stderr': 'stderr',
  405. 'stdout': 'out\n\nCommand completed successfully!\n',
  406. 'clusterName': u'cc',
  407. 'structuredOut': '""',
  408. 'roleCommand': u'CUSTOM_COMMAND',
  409. 'serviceName': u'HDFS',
  410. 'role': u'DATANODE',
  411. 'actionId': '1-1',
  412. 'taskId': 9,
  413. 'customCommand': 'RESTART',
  414. 'exitCode': 0}
  415. # Agent caches configurationTags if custom_command RESTART completed
  416. mock_log_command_output.assert_not_called(
  417. [call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
  418. self.assertEqual(len(report['reports']), 1)
  419. self.assertEqual(expected, report['reports'][0])
  420. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  421. @patch("__builtin__.open")
  422. @patch.object(ActionQueue, "status_update_callback")
  423. def test_auto_execute_command(self, status_update_callback_mock, open_mock):
  424. # Make file read calls visible
  425. def open_side_effect(file, mode):
  426. if mode == 'r':
  427. file_mock = MagicMock()
  428. file_mock.read.return_value = "Read from " + str(file)
  429. return file_mock
  430. else:
  431. return self.original_open(file, mode)
  432. open_mock.side_effect = open_side_effect
  433. config = AmbariConfig()
  434. tempdir = tempfile.gettempdir()
  435. config.set('agent', 'prefix', tempdir)
  436. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  437. config.set('agent', 'tolerate_download_failures', "true")
  438. dummy_controller = MagicMock()
  439. dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
  440. dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "", -1)
  441. actionQueue = ActionQueue(config, dummy_controller)
  442. unfreeze_flag = threading.Event()
  443. python_execution_result_dict = {
  444. 'stdout': 'out',
  445. 'stderr': 'stderr',
  446. 'structuredOut' : ''
  447. }
  448. def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
  449. unfreeze_flag.wait()
  450. return python_execution_result_dict
  451. def patched_aq_execute_command(command):
  452. # We have to perform patching for separate thread in the same thread
  453. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  454. runCommand_mock.side_effect = side_effect
  455. actionQueue.process_command(command)
  456. python_execution_result_dict['status'] = 'COMPLETE'
  457. python_execution_result_dict['exitcode'] = 0
  458. self.assertFalse(actionQueue.tasks_in_progress_or_pending())
  459. # We call method in a separate thread
  460. execution_thread = Thread(target = patched_aq_execute_command ,
  461. args = (self.datanode_auto_start_command, ))
  462. execution_thread.start()
  463. # check in progress report
  464. # wait until ready
  465. while True:
  466. time.sleep(0.1)
  467. if actionQueue.tasks_in_progress_or_pending():
  468. break
  469. # Continue command execution
  470. unfreeze_flag.set()
  471. # wait until ready
  472. check_queue = True
  473. while check_queue:
  474. report = actionQueue.result()
  475. if not actionQueue.tasks_in_progress_or_pending():
  476. break
  477. time.sleep(0.1)
  478. self.assertEqual(len(report['reports']), 0)
  479. ## Test failed execution
  480. python_execution_result_dict['status'] = 'FAILED'
  481. python_execution_result_dict['exitcode'] = 13
  482. # We call method in a separate thread
  483. execution_thread = Thread(target = patched_aq_execute_command ,
  484. args = (self.datanode_auto_start_command, ))
  485. execution_thread.start()
  486. unfreeze_flag.set()
  487. # check in progress report
  488. # wait until ready
  489. while check_queue:
  490. report = actionQueue.result()
  491. if not actionQueue.tasks_in_progress_or_pending():
  492. break
  493. time.sleep(0.1)
  494. self.assertEqual(len(report['reports']), 0)
  495. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  496. @patch("__builtin__.open")
  497. @patch.object(ActionQueue, "status_update_callback")
  498. def test_execute_command(self, status_update_callback_mock, open_mock):
  499. # Make file read calls visible
  500. def open_side_effect(file, mode):
  501. if mode == 'r':
  502. file_mock = MagicMock()
  503. file_mock.read.return_value = "Read from " + str(file)
  504. return file_mock
  505. else:
  506. return self.original_open(file, mode)
  507. open_mock.side_effect = open_side_effect
  508. config = AmbariConfig()
  509. tempdir = tempfile.gettempdir()
  510. config.set('agent', 'prefix', tempdir)
  511. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  512. config.set('agent', 'tolerate_download_failures', "true")
  513. dummy_controller = MagicMock()
  514. actionQueue = ActionQueue(config, dummy_controller)
  515. unfreeze_flag = threading.Event()
  516. python_execution_result_dict = {
  517. 'stdout': 'out',
  518. 'stderr': 'stderr',
  519. 'structuredOut' : ''
  520. }
  521. def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
  522. unfreeze_flag.wait()
  523. return python_execution_result_dict
  524. def patched_aq_execute_command(command):
  525. # We have to perform patching for separate thread in the same thread
  526. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  527. runCommand_mock.side_effect = side_effect
  528. actionQueue.execute_command(command)
  529. ### Test install/start/stop command ###
  530. ## Test successful execution with configuration tags
  531. python_execution_result_dict['status'] = 'COMPLETE'
  532. python_execution_result_dict['exitcode'] = 0
  533. # We call method in a separate thread
  534. execution_thread = Thread(target = patched_aq_execute_command ,
  535. args = (self.datanode_install_command, ))
  536. execution_thread.start()
  537. # check in progress report
  538. # wait until ready
  539. while True:
  540. time.sleep(0.1)
  541. report = actionQueue.result()
  542. if len(report['reports']) != 0:
  543. break
  544. expected = {'status': 'IN_PROGRESS',
  545. 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
  546. 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
  547. 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
  548. 'clusterName': u'cc',
  549. 'roleCommand': u'INSTALL',
  550. 'serviceName': u'HDFS',
  551. 'role': u'DATANODE',
  552. 'actionId': '1-1',
  553. 'taskId': 3,
  554. 'exitCode': 777}
  555. self.assertEqual(report['reports'][0], expected)
  556. self.assertTrue(actionQueue.tasks_in_progress_or_pending())
  557. # Continue command execution
  558. unfreeze_flag.set()
  559. # wait until ready
  560. while report['reports'][0]['status'] == 'IN_PROGRESS':
  561. time.sleep(0.1)
  562. report = actionQueue.result()
  563. # check report
  564. configname = os.path.join(tempdir, 'config.json')
  565. expected = {'status': 'COMPLETED',
  566. 'stderr': 'stderr',
  567. 'stdout': 'out\n\nCommand completed successfully!\n',
  568. 'clusterName': u'cc',
  569. 'structuredOut': '""',
  570. 'roleCommand': u'INSTALL',
  571. 'serviceName': u'HDFS',
  572. 'role': u'DATANODE',
  573. 'actionId': '1-1',
  574. 'taskId': 3,
  575. 'configurationTags': {'global': {'tag': 'v1'}},
  576. 'exitCode': 0}
  577. self.assertEqual(len(report['reports']), 1)
  578. self.assertEqual(report['reports'][0], expected)
  579. self.assertTrue(os.path.isfile(configname))
  580. # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
  581. self.assertEqual(status_update_callback_mock.call_count, 2)
  582. os.remove(configname)
  583. # now should not have reports (read complete/failed reports are deleted)
  584. report = actionQueue.result()
  585. self.assertEqual(len(report['reports']), 0)
  586. ## Test failed execution
  587. python_execution_result_dict['status'] = 'FAILED'
  588. python_execution_result_dict['exitcode'] = 13
  589. # We call method in a separate thread
  590. execution_thread = Thread(target = patched_aq_execute_command ,
  591. args = (self.datanode_install_command, ))
  592. execution_thread.start()
  593. unfreeze_flag.set()
  594. # check in progress report
  595. # wait until ready
  596. report = actionQueue.result()
  597. while len(report['reports']) == 0 or \
  598. report['reports'][0]['status'] == 'IN_PROGRESS':
  599. time.sleep(0.1)
  600. report = actionQueue.result()
  601. # check report
  602. expected = {'status': 'FAILED',
  603. 'stderr': 'stderr',
  604. 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n',
  605. 'clusterName': u'cc',
  606. 'structuredOut': '""',
  607. 'roleCommand': u'INSTALL',
  608. 'serviceName': u'HDFS',
  609. 'role': u'DATANODE',
  610. 'actionId': '1-1',
  611. 'taskId': 3,
  612. 'exitCode': 13}
  613. self.assertEqual(len(report['reports']), 1)
  614. self.assertEqual(report['reports'][0], expected)
  615. # now should not have reports (read complete/failed reports are deleted)
  616. report = actionQueue.result()
  617. self.assertEqual(len(report['reports']), 0)
  618. ### Test upgrade command ###
  619. python_execution_result_dict['status'] = 'COMPLETE'
  620. python_execution_result_dict['exitcode'] = 0
  621. execution_thread = Thread(target = patched_aq_execute_command ,
  622. args = (self.datanode_upgrade_command, ))
  623. execution_thread.start()
  624. unfreeze_flag.set()
  625. # wait until ready
  626. report = actionQueue.result()
  627. while len(report['reports']) == 0 or \
  628. report['reports'][0]['status'] == 'IN_PROGRESS':
  629. time.sleep(0.1)
  630. report = actionQueue.result()
  631. # check report
  632. expected = {'status': 'COMPLETED',
  633. 'stderr': 'stderr',
  634. 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n\n\nCommand completed successfully!\n',
  635. 'clusterName': 'clusterName',
  636. 'structuredOut': '""',
  637. 'roleCommand': 'UPGRADE',
  638. 'serviceName': 'serviceName',
  639. 'role': 'role',
  640. 'actionId': 17,
  641. 'taskId': 'taskId',
  642. 'exitCode': 0}
  643. self.assertEqual(len(report['reports']), 1)
  644. self.assertEqual(report['reports'][0], expected)
  645. # now should not have reports (read complete/failed reports are deleted)
  646. report = actionQueue.result()
  647. self.assertEqual(len(report['reports']), 0)
  648. def test_cancel_with_reschedule_command(self):
  649. config = AmbariConfig()
  650. tempdir = tempfile.gettempdir()
  651. config.set('agent', 'prefix', tempdir)
  652. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  653. config.set('agent', 'tolerate_download_failures', "true")
  654. dummy_controller = MagicMock()
  655. actionQueue = ActionQueue(config, dummy_controller)
  656. unfreeze_flag = threading.Event()
  657. python_execution_result_dict = {
  658. 'stdout': 'out',
  659. 'stderr': 'stderr',
  660. 'structuredOut' : '',
  661. 'status' : '',
  662. 'exitcode' : -signal.SIGTERM
  663. }
  664. def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
  665. unfreeze_flag.wait()
  666. return python_execution_result_dict
  667. def patched_aq_execute_command(command):
  668. # We have to perform patching for separate thread in the same thread
  669. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  670. runCommand_mock.side_effect = side_effect
  671. actionQueue.execute_command(command)
  672. # We call method in a separate thread
  673. execution_thread = Thread(target = patched_aq_execute_command ,
  674. args = (self.datanode_install_command, ))
  675. execution_thread.start()
  676. # check in progress report
  677. # wait until ready
  678. while True:
  679. time.sleep(0.1)
  680. report = actionQueue.result()
  681. if len(report['reports']) != 0:
  682. break
  683. unfreeze_flag.set()
  684. # wait until ready
  685. while len(report['reports']) != 0:
  686. time.sleep(0.1)
  687. report = actionQueue.result()
  688. # check report
  689. self.assertEqual(len(report['reports']), 0)
  690. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  691. @patch.object(CustomServiceOrchestrator, "runCommand")
  692. @patch("CommandStatusDict.CommandStatusDict")
  693. @patch.object(ActionQueue, "status_update_callback")
  694. def test_store_configuration_tags(self, status_update_callback_mock,
  695. command_status_dict_mock,
  696. cso_runCommand_mock):
  697. custom_service_orchestrator_execution_result_dict = {
  698. 'stdout': 'out',
  699. 'stderr': 'stderr',
  700. 'structuredOut' : '',
  701. 'exitcode' : 0
  702. }
  703. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  704. config = AmbariConfig()
  705. tempdir = tempfile.gettempdir()
  706. config.set('agent', 'prefix', tempdir)
  707. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  708. config.set('agent', 'tolerate_download_failures', "true")
  709. dummy_controller = MagicMock()
  710. actionQueue = ActionQueue(config, dummy_controller)
  711. actionQueue.execute_command(self.datanode_restart_command)
  712. report = actionQueue.result()
  713. expected = {'status': 'COMPLETED',
  714. 'configurationTags': {'global': {'tag': 'v123'}},
  715. 'stderr': 'stderr',
  716. 'stdout': 'out\n\nCommand completed successfully!\n',
  717. 'clusterName': u'cc',
  718. 'structuredOut': '""',
  719. 'roleCommand': u'CUSTOM_COMMAND',
  720. 'serviceName': u'HDFS',
  721. 'role': u'DATANODE',
  722. 'actionId': '1-1',
  723. 'taskId': 9,
  724. 'customCommand': 'RESTART',
  725. 'exitCode': 0}
  726. # Agent caches configurationTags if custom_command RESTART completed
  727. self.assertEqual(len(report['reports']), 1)
  728. self.assertEqual(expected, report['reports'][0])
  729. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  730. @patch.object(ActualConfigHandler, "write_client_components")
  731. @patch.object(CustomServiceOrchestrator, "runCommand")
  732. @patch("CommandStatusDict.CommandStatusDict")
  733. @patch.object(ActionQueue, "status_update_callback")
  734. def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
  735. command_status_dict_mock,
  736. cso_runCommand_mock, write_client_components_mock):
  737. custom_service_orchestrator_execution_result_dict = {
  738. 'stdout': 'out',
  739. 'stderr': 'stderr',
  740. 'structuredOut' : '',
  741. 'exitcode' : 0
  742. }
  743. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  744. config = AmbariConfig()
  745. tempdir = tempfile.gettempdir()
  746. config.set('agent', 'prefix', tempdir)
  747. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  748. config.set('agent', 'tolerate_download_failures', "true")
  749. dummy_controller = MagicMock()
  750. actionQueue = ActionQueue(config, dummy_controller)
  751. actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
  752. report = actionQueue.result()
  753. expected = {'status': 'COMPLETED',
  754. 'configurationTags': {'global': {'tag': 'v123'}},
  755. 'stderr': 'stderr',
  756. 'stdout': 'out\n\nCommand completed successfully!\n',
  757. 'clusterName': u'cc',
  758. 'structuredOut': '""',
  759. 'roleCommand': u'CUSTOM_COMMAND',
  760. 'serviceName': u'HDFS',
  761. 'role': u'DATANODE',
  762. 'actionId': '1-1',
  763. 'taskId': 9,
  764. 'customCommand': 'RESTART',
  765. 'exitCode': 0}
  766. # Agent caches configurationTags if custom_command RESTART completed
  767. self.assertEqual(len(report['reports']), 1)
  768. self.assertEqual(expected, report['reports'][0])
  769. self.assertFalse(write_client_components_mock.called)
  770. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  771. @patch.object(ActualConfigHandler, "write_client_components")
  772. @patch.object(ActualConfigHandler, "write_actual_component")
  773. @patch.object(ActualConfigHandler, "update_component_tag")
  774. @patch.object(CustomServiceOrchestrator, "runCommand")
  775. @patch("CommandStatusDict.CommandStatusDict")
  776. @patch.object(ActionQueue, "status_update_callback")
  777. def test_refresh_queues_custom_command(self, status_update_callback_mock,
  778. command_status_dict_mock,
  779. cso_runCommand_mock, update_component_tag, write_actual_component_mock, write_client_components_mock):
  780. custom_service_orchestrator_execution_result_dict = {
  781. 'stdout': 'out',
  782. 'stderr': 'stderr',
  783. 'structuredOut' : '',
  784. 'exitcode' : 0
  785. }
  786. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  787. config = AmbariConfig()
  788. tempdir = tempfile.gettempdir()
  789. config.set('agent', 'prefix', tempdir)
  790. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  791. config.set('agent', 'tolerate_download_failures', "true")
  792. dummy_controller = MagicMock()
  793. actionQueue = ActionQueue(config, dummy_controller)
  794. actionQueue.execute_command(self.yarn_refresh_queues_custom_command)
  795. report = actionQueue.result()
  796. expected = {'status': 'COMPLETED',
  797. 'configurationTags': None,
  798. 'stderr': 'stderr',
  799. 'stdout': 'out\n\nCommand completed successfully!\n',
  800. 'clusterName': u'cc',
  801. 'structuredOut': '""',
  802. 'roleCommand': u'CUSTOM_COMMAND',
  803. 'serviceName': u'YARN',
  804. 'role': u'RESOURCEMANAGER',
  805. 'actionId': '1-1',
  806. 'taskId': 9,
  807. 'customCommand': 'RESTART',
  808. 'exitCode': 0}
  809. self.assertEqual(len(report['reports']), 1)
  810. self.assertEqual(expected, report['reports'][0])
  811. # Configuration tags should be updated
  812. self.assertTrue(update_component_tag.called)
  813. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  814. @patch.object(ActualConfigHandler, "write_client_components")
  815. @patch.object(ActualConfigHandler, "write_actual_component")
  816. @patch.object(CustomServiceOrchestrator, "runCommand")
  817. @patch("CommandStatusDict.CommandStatusDict")
  818. @patch.object(ActionQueue, "status_update_callback")
  819. def test_store_configuration_tags_on_custom_start_command(self, status_update_callback_mock,
  820. command_status_dict_mock,
  821. cso_runCommand_mock, write_actual_component_mock, write_client_components_mock):
  822. custom_service_orchestrator_execution_result_dict = {
  823. 'stdout': 'out',
  824. 'stderr': 'stderr',
  825. 'structuredOut' : '',
  826. 'exitcode' : 0
  827. }
  828. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  829. config = AmbariConfig()
  830. tempdir = tempfile.gettempdir()
  831. config.set('agent', 'prefix', tempdir)
  832. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  833. config.set('agent', 'tolerate_download_failures', "true")
  834. dummy_controller = MagicMock()
  835. actionQueue = ActionQueue(config, dummy_controller)
  836. actionQueue.execute_command(self.datanode_start_custom_command)
  837. report = actionQueue.result()
  838. expected = {'status': 'COMPLETED',
  839. 'configurationTags': {'global': {'tag': 'v123'}},
  840. 'stderr': 'stderr',
  841. 'stdout': 'out\n\nCommand completed successfully!\n',
  842. 'clusterName': u'cc',
  843. 'structuredOut': '""',
  844. 'roleCommand': u'CUSTOM_COMMAND',
  845. 'serviceName': u'HDFS',
  846. 'role': u'DATANODE',
  847. 'actionId': '1-1',
  848. 'taskId': 9,
  849. 'customCommand': 'START',
  850. 'exitCode': 0}
  851. self.assertEqual(len(report['reports']), 1)
  852. self.assertEqual(expected, report['reports'][0])
  853. # Configuration tags should be updated on custom start command
  854. self.assertTrue(write_actual_component_mock.called)
  855. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  856. @patch.object(ActualConfigHandler, "write_actual_component")
  857. @patch.object(CustomServiceOrchestrator, "runCommand")
  858. @patch("CommandStatusDict.CommandStatusDict")
  859. @patch.object(ActionQueue, "status_update_callback")
  860. def test_store_config_tags_on_install_client_command(self, status_update_callback_mock,
  861. command_status_dict_mock,
  862. cso_runCommand_mock, write_actual_component_mock):
  863. custom_service_orchestrator_execution_result_dict = {
  864. 'stdout': 'out',
  865. 'stderr': 'stderr',
  866. 'structuredOut' : '',
  867. 'exitcode' : 0
  868. }
  869. cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
  870. tez_client_install_command = {
  871. 'commandType': 'EXECUTION_COMMAND',
  872. 'role': u'TEZ_CLIENT',
  873. 'roleCommand': u'INSTALL',
  874. 'commandId': '1-1',
  875. 'taskId': 9,
  876. 'clusterName': u'cc',
  877. 'serviceName': u'TEZ',
  878. 'configurations': {'global' : {}},
  879. 'configurationTags': {'global' : { 'tag': 'v123' }},
  880. 'hostLevelParams': {}
  881. }
  882. LiveStatus.CLIENT_COMPONENTS = ({'serviceName': 'TEZ', 'componentName': 'TEZ_CLIENT'}, )
  883. config = AmbariConfig()
  884. tempdir = tempfile.gettempdir()
  885. config.set('agent', 'prefix', tempdir)
  886. config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
  887. config.set('agent', 'tolerate_download_failures', "true")
  888. dummy_controller = MagicMock()
  889. actionQueue = ActionQueue(config, dummy_controller)
  890. actionQueue.execute_command(tez_client_install_command)
  891. # Configuration tags should be updated on install client command
  892. self.assertTrue(write_actual_component_mock.called)
  893. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  894. @patch.object(ActionQueue, "status_update_callback")
  895. @patch.object(ActionQueue, "execute_command")
  896. @patch.object(LiveStatus, "build")
  897. @patch.object(CustomServiceOrchestrator, "__init__")
  898. def test_execute_status_command(self, CustomServiceOrchestrator_mock,
  899. build_mock, execute_command_mock,
  900. status_update_callback):
  901. CustomServiceOrchestrator_mock.return_value = None
  902. dummy_controller = MagicMock()
  903. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  904. build_mock.return_value = {'dummy report': '' }
  905. dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
  906. result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
  907. actionQueue.process_status_command_result(result)
  908. report = actionQueue.result()
  909. expected = {'dummy report': '',
  910. 'securityState' : 'UNKNOWN'}
  911. self.assertEqual(len(report['componentStatus']), 1)
  912. self.assertEqual(report['componentStatus'][0], expected)
  913. @patch.object(RecoveryManager, "command_exists")
  914. @patch.object(RecoveryManager, "requires_recovery")
  915. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  916. @patch.object(ActionQueue, "status_update_callback")
  917. @patch.object(ActionQueue, "execute_command")
  918. @patch.object(LiveStatus, "build")
  919. @patch.object(CustomServiceOrchestrator, "__init__")
  920. def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock,
  921. build_mock, execute_command_mock,
  922. status_update_callback, requires_recovery_mock,
  923. command_exists_mock):
  924. CustomServiceOrchestrator_mock.return_value = None
  925. dummy_controller = MagicMock()
  926. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  927. build_mock.return_value = {'dummy report': '' }
  928. requires_recovery_mock.return_value = True
  929. command_exists_mock.return_value = False
  930. dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
  931. result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
  932. actionQueue.process_status_command_result(result)
  933. report = actionQueue.result()
  934. expected = {'dummy report': '',
  935. 'securityState' : 'UNKNOWN',
  936. 'sendExecCmdDet': 'True'}
  937. self.assertEqual(len(report['componentStatus']), 1)
  938. self.assertEqual(report['componentStatus'][0], expected)
  939. requires_recovery_mock.return_value = True
  940. command_exists_mock.return_value = True
  941. result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
  942. actionQueue.process_status_command_result(result)
  943. report = actionQueue.result()
  944. expected = {'dummy report': '',
  945. 'securityState' : 'UNKNOWN',
  946. 'sendExecCmdDet': 'False'}
  947. self.assertEqual(len(report['componentStatus']), 1)
  948. self.assertEqual(report['componentStatus'][0], expected)
  949. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  950. @patch.object(ActionQueue, "status_update_callback")
  951. @patch.object(ActionQueue, "execute_command")
  952. @patch.object(LiveStatus, "build")
  953. @patch.object(CustomServiceOrchestrator, "__init__")
  954. def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock,
  955. build_mock, execute_command_mock,
  956. status_update_callback):
  957. CustomServiceOrchestrator_mock.return_value = None
  958. dummy_controller = MagicMock()
  959. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  960. command_return_value = {
  961. 'exitcode': 0,
  962. 'stdout': 'out',
  963. 'stderr': 'err',
  964. 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
  965. }
  966. result = (self.status_command_for_alerts, command_return_value, command_return_value)
  967. build_mock.return_value = {'somestatusresult': 'aresult'}
  968. actionQueue.process_status_command_result(result)
  969. report = actionQueue.result()
  970. self.assertEqual(len(report['componentStatus']), 1)
  971. self.assertTrue(report['componentStatus'][0].has_key('alerts'))
  972. @patch.object(AmbariConfig, "get_parallel_exec_option")
  973. @patch.object(ActionQueue, "process_command")
  974. @patch.object(Queue, "get")
  975. @patch.object(CustomServiceOrchestrator, "__init__")
  976. def test_reset_queue(self, CustomServiceOrchestrator_mock,
  977. get_mock, process_command_mock, gpeo_mock):
  978. CustomServiceOrchestrator_mock.return_value = None
  979. dummy_controller = MagicMock()
  980. dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
  981. config = MagicMock()
  982. gpeo_mock.return_value = 0
  983. config.get_parallel_exec_option = gpeo_mock
  984. actionQueue = ActionQueue(config, dummy_controller)
  985. actionQueue.start()
  986. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  987. self.assertEqual(2, actionQueue.commandQueue.qsize())
  988. self.assertTrue(actionQueue.tasks_in_progress_or_pending())
  989. actionQueue.reset()
  990. self.assertTrue(actionQueue.commandQueue.empty())
  991. self.assertFalse(actionQueue.tasks_in_progress_or_pending())
  992. time.sleep(0.1)
  993. actionQueue.stop()
  994. actionQueue.join()
  995. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  996. @patch.object(AmbariConfig, "get_parallel_exec_option")
  997. @patch.object(ActionQueue, "process_command")
  998. @patch.object(Queue, "get")
  999. @patch.object(CustomServiceOrchestrator, "__init__")
  1000. def test_cancel(self, CustomServiceOrchestrator_mock,
  1001. get_mock, process_command_mock, gpeo_mock):
  1002. CustomServiceOrchestrator_mock.return_value = None
  1003. dummy_controller = MagicMock()
  1004. config = MagicMock()
  1005. gpeo_mock.return_value = 0
  1006. config.get_parallel_exec_option = gpeo_mock
  1007. actionQueue = ActionQueue(config, dummy_controller)
  1008. actionQueue.start()
  1009. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  1010. self.assertEqual(2, actionQueue.commandQueue.qsize())
  1011. actionQueue.reset()
  1012. self.assertTrue(actionQueue.commandQueue.empty())
  1013. time.sleep(0.1)
  1014. actionQueue.stop()
  1015. actionQueue.join()
  1016. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  1017. @patch.object(AmbariConfig, "get_parallel_exec_option")
  1018. @patch.object(ActionQueue, "process_command")
  1019. @patch.object(CustomServiceOrchestrator, "__init__")
  1020. def test_parallel_exec(self, CustomServiceOrchestrator_mock,
  1021. process_command_mock, gpeo_mock):
  1022. CustomServiceOrchestrator_mock.return_value = None
  1023. dummy_controller = MagicMock()
  1024. config = MagicMock()
  1025. gpeo_mock.return_value = 1
  1026. config.get_parallel_exec_option = gpeo_mock
  1027. actionQueue = ActionQueue(config, dummy_controller)
  1028. actionQueue.put([self.datanode_install_command, self.hbase_install_command])
  1029. self.assertEqual(2, actionQueue.commandQueue.qsize())
  1030. actionQueue.start()
  1031. time.sleep(1)
  1032. actionQueue.stop()
  1033. actionQueue.join()
  1034. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  1035. self.assertEqual(2, process_command_mock.call_count)
  1036. process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
  1037. @patch("threading.Thread")
  1038. @patch.object(AmbariConfig, "get_parallel_exec_option")
  1039. @patch.object(ActionQueue, "process_command")
  1040. @patch.object(CustomServiceOrchestrator, "__init__")
  1041. def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
  1042. process_command_mock, gpeo_mock, threading_mock):
  1043. CustomServiceOrchestrator_mock.return_value = None
  1044. dummy_controller = MagicMock()
  1045. config = MagicMock()
  1046. gpeo_mock.return_value = 1
  1047. config.get_parallel_exec_option = gpeo_mock
  1048. actionQueue = ActionQueue(config, dummy_controller)
  1049. actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command])
  1050. self.assertEqual(2, actionQueue.commandQueue.qsize())
  1051. actionQueue.start()
  1052. time.sleep(1)
  1053. actionQueue.stop()
  1054. actionQueue.join()
  1055. self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
  1056. self.assertEqual(2, process_command_mock.call_count)
  1057. self.assertEqual(0, threading_mock.call_count)
  1058. process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
  1059. @not_for_platform(PLATFORM_LINUX)
  1060. @patch("time.sleep")
  1061. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  1062. @patch.object(CustomServiceOrchestrator, "__init__")
  1063. def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
  1064. sleep_mock
  1065. ):
  1066. CustomServiceOrchestrator_mock.return_value = None
  1067. dummy_controller = MagicMock()
  1068. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  1069. python_execution_result_dict = {
  1070. 'exitcode': 1,
  1071. 'stdout': 'out',
  1072. 'stderr': 'stderr',
  1073. 'structuredOut': '',
  1074. 'status': 'FAILED'
  1075. }
  1076. def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
  1077. return python_execution_result_dict
  1078. command = copy.deepcopy(self.retryable_command)
  1079. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  1080. runCommand_mock.side_effect = side_effect
  1081. actionQueue.execute_command(command)
  1082. #assert that python executor start
  1083. self.assertTrue(runCommand_mock.called)
  1084. self.assertEqual(3, runCommand_mock.call_count)
  1085. self.assertEqual(2, sleep_mock.call_count)
  1086. sleep_mock.assert_has_calls([call(2), call(3)], False)
  1087. runCommand_mock.assert_has_calls([
  1088. call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
  1089. os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
  1090. call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
  1091. os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True),
  1092. call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
  1093. os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
  1094. @patch("time.time")
  1095. @patch("time.sleep")
  1096. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  1097. @patch.object(CustomServiceOrchestrator, "__init__")
  1098. def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock,
  1099. sleep_mock, time_mock
  1100. ):
  1101. CustomServiceOrchestrator_mock.return_value = None
  1102. dummy_controller = MagicMock()
  1103. dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
  1104. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  1105. python_execution_result_dict = {
  1106. 'exitcode': 1,
  1107. 'stdout': 'out',
  1108. 'stderr': 'stderr',
  1109. 'structuredOut': '',
  1110. 'status': 'FAILED'
  1111. }
  1112. times_arr = [8, 10, 14, 18, 22, 26, 30, 34]
  1113. if self.logger.isEnabledFor(logging.INFO):
  1114. times_arr.insert(0, 4)
  1115. time_mock.side_effect = times_arr
  1116. def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
  1117. return python_execution_result_dict
  1118. command = copy.deepcopy(self.retryable_command)
  1119. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  1120. runCommand_mock.side_effect = side_effect
  1121. actionQueue.execute_command(command)
  1122. #assert that python executor start
  1123. self.assertTrue(runCommand_mock.called)
  1124. self.assertEqual(2, runCommand_mock.call_count)
  1125. self.assertEqual(1, sleep_mock.call_count)
  1126. sleep_mock.assert_has_calls([call(1)], False)
  1127. runCommand_mock.assert_has_calls([
  1128. call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
  1129. os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
  1130. call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
  1131. os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
  1132. #retryable_command
  1133. @not_for_platform(PLATFORM_LINUX)
  1134. @patch("time.sleep")
  1135. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  1136. @patch.object(CustomServiceOrchestrator, "__init__")
  1137. def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
  1138. sleep_mock
  1139. ):
  1140. CustomServiceOrchestrator_mock.return_value = None
  1141. dummy_controller = MagicMock()
  1142. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  1143. execution_result_fail_dict = {
  1144. 'exitcode': 1,
  1145. 'stdout': 'out',
  1146. 'stderr': 'stderr',
  1147. 'structuredOut': '',
  1148. 'status': 'FAILED'
  1149. }
  1150. execution_result_succ_dict = {
  1151. 'exitcode': 0,
  1152. 'stdout': 'out',
  1153. 'stderr': 'stderr',
  1154. 'structuredOut': '',
  1155. 'status': 'COMPLETED'
  1156. }
  1157. command = copy.deepcopy(self.retryable_command)
  1158. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  1159. runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
  1160. actionQueue.execute_command(command)
  1161. #assert that python executor start
  1162. self.assertTrue(runCommand_mock.called)
  1163. self.assertEqual(2, runCommand_mock.call_count)
  1164. self.assertEqual(1, sleep_mock.call_count)
  1165. sleep_mock.assert_any_call(2)
  1166. @not_for_platform(PLATFORM_LINUX)
  1167. @patch("time.sleep")
  1168. @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
  1169. @patch.object(CustomServiceOrchestrator, "__init__")
  1170. def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock,
  1171. sleep_mock
  1172. ):
  1173. CustomServiceOrchestrator_mock.return_value = None
  1174. dummy_controller = MagicMock()
  1175. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  1176. execution_result_succ_dict = {
  1177. 'exitcode': 0,
  1178. 'stdout': 'out',
  1179. 'stderr': 'stderr',
  1180. 'structuredOut': '',
  1181. 'status': 'COMPLETED'
  1182. }
  1183. command = copy.deepcopy(self.retryable_command)
  1184. with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
  1185. runCommand_mock.side_effect = [execution_result_succ_dict]
  1186. actionQueue.execute_command(command)
  1187. #assert that python executor start
  1188. self.assertTrue(runCommand_mock.called)
  1189. self.assertFalse(sleep_mock.called)
  1190. self.assertEqual(1, runCommand_mock.call_count)
  1191. @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
  1192. @patch.object(CustomServiceOrchestrator, "runCommand")
  1193. @patch.object(CustomServiceOrchestrator, "__init__")
  1194. def test_execute_background_command(self, CustomServiceOrchestrator_mock,
  1195. runCommand_mock,
  1196. ):
  1197. CustomServiceOrchestrator_mock.return_value = None
  1198. CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
  1199. 'stdout': 'out-11',
  1200. 'stderr' : 'err-13'}
  1201. dummy_controller = MagicMock()
  1202. actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
  1203. execute_command = copy.deepcopy(self.background_command)
  1204. actionQueue.put([execute_command])
  1205. actionQueue.processBackgroundQueueSafeEmpty();
  1206. actionQueue.controller.statusCommandExecutor.process_results();
  1207. #assert that python execturor start
  1208. self.assertTrue(runCommand_mock.called)
  1209. runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
  1210. self.assertTrue(runningCommand is not None)
  1211. self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
  1212. report = actionQueue.result()
  1213. self.assertEqual(len(report['reports']),1)
  1214. @patch.object(CustomServiceOrchestrator, "get_py_executor")
  1215. @patch.object(CustomServiceOrchestrator, "resolve_script_path")
  1216. def test_execute_python_executor(self, resolve_script_path_mock,
  1217. get_py_executor_mock):
  1218. dummy_controller = MagicMock()
  1219. cfg = AmbariConfig()
  1220. cfg.set('agent', 'tolerate_download_failures', 'true')
  1221. cfg.set('agent', 'prefix', '.')
  1222. cfg.set('agent', 'cache_dir', 'background_tasks')
  1223. actionQueue = ActionQueue(cfg, dummy_controller)
  1224. pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
  1225. patch_output_file(pyex)
  1226. get_py_executor_mock.return_value = pyex
  1227. actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
  1228. result = {}
  1229. lock = threading.RLock()
  1230. complete_done = threading.Condition(lock)
  1231. def command_complete_w(process_condensed_result, handle):
  1232. with lock:
  1233. result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
  1234. 'handle' : copy.copy(handle),
  1235. 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
  1236. }
  1237. complete_done.notifyAll()
  1238. actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
  1239. None, command_complete_w)
  1240. actionQueue.put([self.background_command])
  1241. actionQueue.processBackgroundQueueSafeEmpty();
  1242. actionQueue.controller.statusCommandExecutor.process_results();
  1243. with lock:
  1244. complete_done.wait(0.1)
  1245. finished_status = result['command_complete']['command_status']
  1246. self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
  1247. self.assertEqual(finished_status['stdout'], 'process_out')
  1248. self.assertEqual(finished_status['stderr'], 'process_err')
  1249. self.assertEqual(finished_status['exitCode'], 0)
  1250. runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
  1251. self.assertTrue(runningCommand is not None)
  1252. report = actionQueue.result()
  1253. self.assertEqual(len(report['reports']),1)
  1254. self.assertEqual(report['reports'][0]['stdout'],'process_out')
  1255. # self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
  1256. cancel_background_command = {
  1257. "commandType":"CANCEL_COMMAND",
  1258. "role":"AMBARI_SERVER_ACTION",
  1259. "roleCommand":"ABORT",
  1260. "commandId":"2--1",
  1261. "taskId":20,
  1262. "clusterName":"c1",
  1263. "serviceName":"",
  1264. "hostname":"c6401",
  1265. "roleParams":{
  1266. "cancelTaskIdTargets":"13,14"
  1267. },
  1268. }
  1269. def patch_output_file(pythonExecutor):
  1270. def windows_py(command, tmpout, tmperr):
  1271. proc = MagicMock()
  1272. proc.pid = 33
  1273. proc.returncode = 0
  1274. with tmpout:
  1275. tmpout.write('process_out')
  1276. with tmperr:
  1277. tmperr.write('process_err')
  1278. return proc
  1279. def open_subprocess_files_win(fout, ferr, f):
  1280. return MagicMock(), MagicMock()
  1281. def read_result_from_files(out_path, err_path, structured_out_path):
  1282. return 'process_out', 'process_err', '{"a": "b."}'
  1283. pythonExecutor.launch_python_subprocess = windows_py
  1284. pythonExecutor.open_subprocess_files = open_subprocess_files_win
  1285. pythonExecutor.read_result_from_files = read_result_from_files
  1286. def wraped(func, before = None, after = None):
  1287. def wrapper(*args, **kwargs):
  1288. if(before is not None):
  1289. before(*args, **kwargs)
  1290. ret = func(*args, **kwargs)
  1291. if(after is not None):
  1292. after(*args, **kwargs)
  1293. return ret
  1294. return wrapper