12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- from Queue import Queue
- from unittest import TestCase
- from ambari_agent.LiveStatus import LiveStatus
- from ambari_agent.ActionQueue import ActionQueue
- from ambari_agent.AmbariConfig import AmbariConfig
- import os, errno, time, pprint, tempfile, threading
- import sys
- from threading import Thread
- import copy
- import signal
- from mock.mock import patch, MagicMock, call
- from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
- from ambari_agent.PythonExecutor import PythonExecutor
- from ambari_agent.ActualConfigHandler import ActualConfigHandler
- from ambari_agent.RecoveryManager import RecoveryManager
- from ambari_commons import OSCheck
- from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX
- import logging
- class TestActionQueue(TestCase):
- def setUp(self):
- # save original open() method for later use
- self.original_open = open
- def tearDown(self):
- sys.stdout = sys.__stdout__
- logger = logging.getLogger()
- datanode_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 3,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'hostLevelParams': {},
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v1' }},
- 'commandParams': {
- 'command_retry_enabled': 'true'
- }
- }
- datanode_install_no_retry_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 3,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'hostLevelParams': {},
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v1' }},
- 'commandParams': {
- 'command_retry_enabled': 'false'
- }
- }
- datanode_auto_start_command = {
- 'commandType': 'AUTO_EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'START',
- 'commandId': '1-1',
- 'taskId': 3,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'hostLevelParams': {},
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v1' }}
- }
- datanode_upgrade_command = {
- 'commandId': 17,
- 'role' : "role",
- 'taskId' : "taskId",
- 'clusterName' : "clusterName",
- 'serviceName' : "serviceName",
- 'roleCommand' : 'UPGRADE',
- 'hostname' : "localhost.localdomain",
- 'hostLevelParams': {},
- 'clusterHostInfo': "clusterHostInfo",
- 'commandType': "EXECUTION_COMMAND",
- 'configurations':{'global' : {}},
- 'roleParams': {},
- 'commandParams' : {
- 'source_stack_version' : 'HDP-1.2.1',
- 'target_stack_version' : 'HDP-1.3.0'
- }
- }
- namenode_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'NAMENODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 4,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'hostLevelParams': {}
- }
- snamenode_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'SECONDARY_NAMENODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 5,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'hostLevelParams': {}
- }
- hbase_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'HBASE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 7,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'hostLevelParams': {},
- 'commandParams': {
- 'command_retry_enabled': 'true'
- }
- }
- status_command = {
- "serviceName" : 'HDFS',
- "commandType" : "STATUS_COMMAND",
- "clusterName" : "",
- "componentName" : "DATANODE",
- 'configurations':{},
- 'hostLevelParams': {}
- }
- datanode_restart_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'commandId': '1-1',
- 'taskId': 9,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v123' }},
- 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
- }
- datanode_restart_command_no_logging = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'commandId': '1-1',
- 'taskId': 9,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'configurations': {'global': {}},
- 'configurationTags': {'global': {'tag': 'v123'}},
- 'commandParams': {
- 'log_output': 'false'
- },
- 'hostLevelParams': {'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
- }
- datanode_restart_command_no_clients_update = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'commandId': '1-1',
- 'taskId': 9,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v123' }},
- 'hostLevelParams':{'custom_command': 'RESTART'}
- }
- datanode_start_custom_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'commandId': '1-1',
- 'taskId': 9,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v123' }},
- 'hostLevelParams':{'custom_command': 'START'}
- }
- yarn_refresh_queues_custom_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'RESOURCEMANAGER',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'commandId': '1-1',
- 'taskId': 9,
- 'clusterName': u'cc',
- 'serviceName': u'YARN',
- 'commandParams' : {'forceRefreshConfigTags' : 'capacity-scheduler'},
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v123' }, 'capacity-scheduler' : {'tag': 'v123'}},
- 'hostLevelParams':{'custom_command': 'REFRESHQUEUES'}
- }
- status_command_for_alerts = {
- "serviceName" : 'FLUME',
- "commandType" : "STATUS_COMMAND",
- "clusterName" : "",
- "componentName" : "FLUME_HANDLER",
- 'configurations':{},
- 'hostLevelParams': {}
- }
- retryable_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': 'NAMENODE',
- 'roleCommand': 'INSTALL',
- 'commandId': '1-1',
- 'taskId': 19,
- 'clusterName': 'c1',
- 'serviceName': 'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v123' }},
- 'commandParams' : {
- 'script_type' : 'PYTHON',
- 'script' : 'script.py',
- 'command_timeout' : '600',
- 'jdk_location' : '.',
- 'service_package_folder' : '.',
- 'command_retry_enabled' : 'true',
- 'max_duration_for_retries' : '5'
- },
- 'hostLevelParams' : {}
- }
- background_command = {
- 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
- 'role': 'NAMENODE',
- 'roleCommand': 'CUSTOM_COMMAND',
- 'commandId': '1-1',
- 'taskId': 19,
- 'clusterName': 'c1',
- 'serviceName': 'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v123' }},
- 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
- 'commandParams' : {
- 'script_type' : 'PYTHON',
- 'script' : 'script.py',
- 'command_timeout' : '600',
- 'jdk_location' : '.',
- 'service_package_folder' : '.'
- }
- }
- cancel_background_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': 'NAMENODE',
- 'roleCommand': 'ACTIONEXECUTE',
- 'commandId': '1-1',
- 'taskId': 20,
- 'clusterName': 'c1',
- 'serviceName': 'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : {}},
- 'hostLevelParams':{},
- 'commandParams' : {
- 'script_type' : 'PYTHON',
- 'script' : 'cancel_background_task.py',
- 'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
- 'jdk_location' : '.',
- 'command_timeout' : '600',
- 'service_package_folder' : '.',
- 'cancel_policy': 'SIGKILL',
- 'cancel_task_id': "19",
- }
- }
- @patch.object(AmbariConfig, "get_parallel_exec_option")
- @patch.object(ActionQueue, "process_command")
- @patch.object(Queue, "get")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock, get_parallel_exec_option_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- config = MagicMock()
- get_parallel_exec_option_mock.return_value = 0
- config.get_parallel_exec_option = get_parallel_exec_option_mock
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.start()
- time.sleep(0.1)
- actionQueue.stop()
- actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- self.assertTrue(process_command_mock.call_count > 1)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("logging.RootLogger.exception")
- @patch.object(ActionQueue, "execute_command")
- def test_process_command(self, execute_command_mock, log_exc_mock):
- dummy_controller = MagicMock()
- config = AmbariConfig()
- config.set('agent', 'tolerate_download_failures', "true")
- actionQueue = ActionQueue(config, dummy_controller)
- execution_command = {
- 'commandType' : ActionQueue.EXECUTION_COMMAND,
- }
- status_command = {
- 'commandType' : ActionQueue.STATUS_COMMAND,
- }
- wrong_command = {
- 'commandType' : "SOME_WRONG_COMMAND",
- }
- # Try wrong command
- actionQueue.process_command(wrong_command)
- self.assertFalse(execute_command_mock.called)
- self.assertFalse(log_exc_mock.called)
- execute_command_mock.reset_mock()
- log_exc_mock.reset_mock()
- # Try normal execution
- actionQueue.process_command(execution_command)
- self.assertTrue(execute_command_mock.called)
- self.assertFalse(log_exc_mock.called)
- execute_command_mock.reset_mock()
- log_exc_mock.reset_mock()
- execute_command_mock.reset_mock()
- log_exc_mock.reset_mock()
- # Try exception to check proper logging
- def side_effect(self):
- raise Exception("TerribleException")
- execute_command_mock.side_effect = side_effect
- actionQueue.process_command(execution_command)
- self.assertTrue(log_exc_mock.called)
- log_exc_mock.reset_mock()
- actionQueue.process_command(execution_command)
- self.assertTrue(log_exc_mock.called)
- @patch.object(ActionQueue, "log_command_output")
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_log_execution_commands(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, mock_log_command_output):
- custom_service_orchestrator_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : '',
- 'exitcode' : 0
- }
- cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- config.set('logging', 'log_command_executes', 1)
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(self.datanode_restart_command)
- report = actionQueue.result()
- expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 9,
- 'customCommand': 'RESTART',
- 'exitCode': 0}
- # Agent caches configurationTags if custom_command RESTART completed
- mock_log_command_output.assert_has_calls([call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
- @patch.object(ActionQueue, "log_command_output")
- @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_do_not_log_execution_commands(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, mock_log_command_output):
- custom_service_orchestrator_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut': '',
- 'exitcode': 0
- }
- cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- config.set('logging', 'log_command_executes', 1)
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(self.datanode_restart_command_no_logging)
- report = actionQueue.result()
- expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 9,
- 'customCommand': 'RESTART',
- 'exitCode': 0}
- # Agent caches configurationTags if custom_command RESTART completed
- mock_log_command_output.assert_not_called(
- [call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("__builtin__.open")
- @patch.object(ActionQueue, "status_update_callback")
- def test_auto_execute_command(self, status_update_callback_mock, open_mock):
- # Make file read calls visible
- def open_side_effect(file, mode):
- if mode == 'r':
- file_mock = MagicMock()
- file_mock.read.return_value = "Read from " + str(file)
- return file_mock
- else:
- return self.original_open(file, mode)
- open_mock.side_effect = open_side_effect
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
- dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "", -1)
- actionQueue = ActionQueue(config, dummy_controller)
- unfreeze_flag = threading.Event()
- python_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : ''
- }
- def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
- unfreeze_flag.wait()
- return python_execution_result_dict
- def patched_aq_execute_command(command):
- # We have to perform patching for separate thread in the same thread
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = side_effect
- actionQueue.process_command(command)
- python_execution_result_dict['status'] = 'COMPLETE'
- python_execution_result_dict['exitcode'] = 0
- self.assertFalse(actionQueue.tasks_in_progress_or_pending())
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_auto_start_command, ))
- execution_thread.start()
- # check in progress report
- # wait until ready
- while True:
- time.sleep(0.1)
- if actionQueue.tasks_in_progress_or_pending():
- break
- # Continue command execution
- unfreeze_flag.set()
- # wait until ready
- check_queue = True
- while check_queue:
- report = actionQueue.result()
- if not actionQueue.tasks_in_progress_or_pending():
- break
- time.sleep(0.1)
- self.assertEqual(len(report['reports']), 0)
- ## Test failed execution
- python_execution_result_dict['status'] = 'FAILED'
- python_execution_result_dict['exitcode'] = 13
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_auto_start_command, ))
- execution_thread.start()
- unfreeze_flag.set()
- # check in progress report
- # wait until ready
- while check_queue:
- report = actionQueue.result()
- if not actionQueue.tasks_in_progress_or_pending():
- break
- time.sleep(0.1)
- self.assertEqual(len(report['reports']), 0)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("__builtin__.open")
- @patch.object(ActionQueue, "status_update_callback")
- def test_execute_command(self, status_update_callback_mock, open_mock):
- # Make file read calls visible
- def open_side_effect(file, mode):
- if mode == 'r':
- file_mock = MagicMock()
- file_mock.read.return_value = "Read from " + str(file)
- return file_mock
- else:
- return self.original_open(file, mode)
- open_mock.side_effect = open_side_effect
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- unfreeze_flag = threading.Event()
- python_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : ''
- }
- def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
- unfreeze_flag.wait()
- return python_execution_result_dict
- def patched_aq_execute_command(command):
- # We have to perform patching for separate thread in the same thread
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = side_effect
- actionQueue.execute_command(command)
- ### Test install/start/stop command ###
- ## Test successful execution with configuration tags
- python_execution_result_dict['status'] = 'COMPLETE'
- python_execution_result_dict['exitcode'] = 0
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
- execution_thread.start()
- # check in progress report
- # wait until ready
- while True:
- time.sleep(0.1)
- report = actionQueue.result()
- if len(report['reports']) != 0:
- break
- expected = {'status': 'IN_PROGRESS',
- 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
- 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
- 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
- 'clusterName': u'cc',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'exitCode': 777}
- self.assertEqual(report['reports'][0], expected)
- self.assertTrue(actionQueue.tasks_in_progress_or_pending())
- # Continue command execution
- unfreeze_flag.set()
- # wait until ready
- while report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- configname = os.path.join(tempdir, 'config.json')
- expected = {'status': 'COMPLETED',
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'configurationTags': {'global': {'tag': 'v1'}},
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
- self.assertTrue(os.path.isfile(configname))
- # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
- self.assertEqual(status_update_callback_mock.call_count, 2)
- os.remove(configname)
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
- ## Test failed execution
- python_execution_result_dict['status'] = 'FAILED'
- python_execution_result_dict['exitcode'] = 13
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
- execution_thread.start()
- unfreeze_flag.set()
- # check in progress report
- # wait until ready
- report = actionQueue.result()
- while len(report['reports']) == 0 or \
- report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- expected = {'status': 'FAILED',
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'exitCode': 13}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
- ### Test upgrade command ###
- python_execution_result_dict['status'] = 'COMPLETE'
- python_execution_result_dict['exitcode'] = 0
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_upgrade_command, ))
- execution_thread.start()
- unfreeze_flag.set()
- # wait until ready
- report = actionQueue.result()
- while len(report['reports']) == 0 or \
- report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- expected = {'status': 'COMPLETED',
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n\n\nCommand completed successfully!\n',
- 'clusterName': 'clusterName',
- 'structuredOut': '""',
- 'roleCommand': 'UPGRADE',
- 'serviceName': 'serviceName',
- 'role': 'role',
- 'actionId': 17,
- 'taskId': 'taskId',
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
- def test_cancel_with_reschedule_command(self):
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- unfreeze_flag = threading.Event()
- python_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : '',
- 'status' : '',
- 'exitcode' : -signal.SIGTERM
- }
- def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
- unfreeze_flag.wait()
- return python_execution_result_dict
- def patched_aq_execute_command(command):
- # We have to perform patching for separate thread in the same thread
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = side_effect
- actionQueue.execute_command(command)
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
- execution_thread.start()
- # check in progress report
- # wait until ready
- while True:
- time.sleep(0.1)
- report = actionQueue.result()
- if len(report['reports']) != 0:
- break
- unfreeze_flag.set()
- # wait until ready
- while len(report['reports']) != 0:
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- self.assertEqual(len(report['reports']), 0)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_configuration_tags(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock):
- custom_service_orchestrator_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : '',
- 'exitcode' : 0
- }
- cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(self.datanode_restart_command)
- report = actionQueue.result()
- expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 9,
- 'customCommand': 'RESTART',
- 'exitCode': 0}
- # Agent caches configurationTags if custom_command RESTART completed
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActualConfigHandler, "write_client_components")
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, write_client_components_mock):
- custom_service_orchestrator_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : '',
- 'exitcode' : 0
- }
- cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
- report = actionQueue.result()
- expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 9,
- 'customCommand': 'RESTART',
- 'exitCode': 0}
- # Agent caches configurationTags if custom_command RESTART completed
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
- self.assertFalse(write_client_components_mock.called)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActualConfigHandler, "write_client_components")
- @patch.object(ActualConfigHandler, "write_actual_component")
- @patch.object(ActualConfigHandler, "update_component_tag")
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_refresh_queues_custom_command(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, update_component_tag, write_actual_component_mock, write_client_components_mock):
- custom_service_orchestrator_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : '',
- 'exitcode' : 0
- }
- cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(self.yarn_refresh_queues_custom_command)
- report = actionQueue.result()
- expected = {'status': 'COMPLETED',
- 'configurationTags': None,
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'serviceName': u'YARN',
- 'role': u'RESOURCEMANAGER',
- 'actionId': '1-1',
- 'taskId': 9,
- 'customCommand': 'RESTART',
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
- # Configuration tags should be updated
- self.assertTrue(update_component_tag.called)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActualConfigHandler, "write_client_components")
- @patch.object(ActualConfigHandler, "write_actual_component")
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_configuration_tags_on_custom_start_command(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, write_actual_component_mock, write_client_components_mock):
- custom_service_orchestrator_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : '',
- 'exitcode' : 0
- }
- cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(self.datanode_start_custom_command)
- report = actionQueue.result()
- expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'CUSTOM_COMMAND',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 9,
- 'customCommand': 'START',
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
- # Configuration tags should be updated on custom start command
- self.assertTrue(write_actual_component_mock.called)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActualConfigHandler, "write_actual_component")
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_config_tags_on_install_client_command(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, write_actual_component_mock):
- custom_service_orchestrator_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : '',
- 'exitcode' : 0
- }
- cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- tez_client_install_command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'TEZ_CLIENT',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 9,
- 'clusterName': u'cc',
- 'serviceName': u'TEZ',
- 'configurations': {'global' : {}},
- 'configurationTags': {'global' : { 'tag': 'v123' }},
- 'hostLevelParams': {}
- }
- LiveStatus.CLIENT_COMPONENTS = ({'serviceName': 'TEZ', 'componentName': 'TEZ_CLIENT'}, )
- config = AmbariConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
- config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(tez_client_install_command)
- # Configuration tags should be updated on install client command
- self.assertTrue(write_actual_component_mock.called)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(LiveStatus, "build")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_status_command(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock,
- status_update_callback):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- build_mock.return_value = {'dummy report': '' }
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
- result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
- actionQueue.process_status_command_result(result)
- report = actionQueue.result()
- expected = {'dummy report': '',
- 'securityState' : 'UNKNOWN'}
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
- @patch.object(RecoveryManager, "command_exists")
- @patch.object(RecoveryManager, "requires_recovery")
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(LiveStatus, "build")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock,
- status_update_callback, requires_recovery_mock,
- command_exists_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- build_mock.return_value = {'dummy report': '' }
- requires_recovery_mock.return_value = True
- command_exists_mock.return_value = False
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
- result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
- actionQueue.process_status_command_result(result)
- report = actionQueue.result()
- expected = {'dummy report': '',
- 'securityState' : 'UNKNOWN',
- 'sendExecCmdDet': 'True'}
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
- requires_recovery_mock.return_value = True
- command_exists_mock.return_value = True
-
- result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
- actionQueue.process_status_command_result(result)
- report = actionQueue.result()
- expected = {'dummy report': '',
- 'securityState' : 'UNKNOWN',
- 'sendExecCmdDet': 'False'}
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(LiveStatus, "build")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock,
- status_update_callback):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- command_return_value = {
- 'exitcode': 0,
- 'stdout': 'out',
- 'stderr': 'err',
- 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
- }
-
- result = (self.status_command_for_alerts, command_return_value, command_return_value)
-
- build_mock.return_value = {'somestatusresult': 'aresult'}
- actionQueue.process_status_command_result(result)
- report = actionQueue.result()
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertTrue(report['componentStatus'][0].has_key('alerts'))
- @patch.object(AmbariConfig, "get_parallel_exec_option")
- @patch.object(ActionQueue, "process_command")
- @patch.object(Queue, "get")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_reset_queue(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock, gpeo_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
- config = MagicMock()
- gpeo_mock.return_value = 0
- config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.start()
- actionQueue.put([self.datanode_install_command, self.hbase_install_command])
- self.assertEqual(2, actionQueue.commandQueue.qsize())
- self.assertTrue(actionQueue.tasks_in_progress_or_pending())
- actionQueue.reset()
- self.assertTrue(actionQueue.commandQueue.empty())
- self.assertFalse(actionQueue.tasks_in_progress_or_pending())
- time.sleep(0.1)
- actionQueue.stop()
- actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- @patch.object(AmbariConfig, "get_parallel_exec_option")
- @patch.object(ActionQueue, "process_command")
- @patch.object(Queue, "get")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_cancel(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock, gpeo_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- config = MagicMock()
- gpeo_mock.return_value = 0
- config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.start()
- actionQueue.put([self.datanode_install_command, self.hbase_install_command])
- self.assertEqual(2, actionQueue.commandQueue.qsize())
- actionQueue.reset()
- self.assertTrue(actionQueue.commandQueue.empty())
- time.sleep(0.1)
- actionQueue.stop()
- actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- @patch.object(AmbariConfig, "get_parallel_exec_option")
- @patch.object(ActionQueue, "process_command")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_parallel_exec(self, CustomServiceOrchestrator_mock,
- process_command_mock, gpeo_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- config = MagicMock()
- gpeo_mock.return_value = 1
- config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.put([self.datanode_install_command, self.hbase_install_command])
- self.assertEqual(2, actionQueue.commandQueue.qsize())
- actionQueue.start()
- time.sleep(1)
- actionQueue.stop()
- actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- self.assertEqual(2, process_command_mock.call_count)
- process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
- @patch("threading.Thread")
- @patch.object(AmbariConfig, "get_parallel_exec_option")
- @patch.object(ActionQueue, "process_command")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
- process_command_mock, gpeo_mock, threading_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- config = MagicMock()
- gpeo_mock.return_value = 1
- config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command])
- self.assertEqual(2, actionQueue.commandQueue.qsize())
- actionQueue.start()
- time.sleep(1)
- actionQueue.stop()
- actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- self.assertEqual(2, process_command_mock.call_count)
- self.assertEqual(0, threading_mock.call_count)
- process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
- @not_for_platform(PLATFORM_LINUX)
- @patch("time.sleep")
- @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
- sleep_mock
- ):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- python_execution_result_dict = {
- 'exitcode': 1,
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut': '',
- 'status': 'FAILED'
- }
- def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
- return python_execution_result_dict
- command = copy.deepcopy(self.retryable_command)
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = side_effect
- actionQueue.execute_command(command)
- #assert that python executor start
- self.assertTrue(runCommand_mock.called)
- self.assertEqual(3, runCommand_mock.call_count)
- self.assertEqual(2, sleep_mock.call_count)
- sleep_mock.assert_has_calls([call(2), call(3)], False)
- runCommand_mock.assert_has_calls([
- call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
- os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
- call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
- os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True),
- call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
- os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
- @patch("time.time")
- @patch("time.sleep")
- @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock,
- sleep_mock, time_mock
- ):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- python_execution_result_dict = {
- 'exitcode': 1,
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut': '',
- 'status': 'FAILED'
- }
- times_arr = [8, 10, 14, 18, 22, 26, 30, 34]
- if self.logger.isEnabledFor(logging.INFO):
- times_arr.insert(0, 4)
- time_mock.side_effect = times_arr
- def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
- return python_execution_result_dict
- command = copy.deepcopy(self.retryable_command)
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = side_effect
- actionQueue.execute_command(command)
- #assert that python executor start
- self.assertTrue(runCommand_mock.called)
- self.assertEqual(2, runCommand_mock.call_count)
- self.assertEqual(1, sleep_mock.call_count)
- sleep_mock.assert_has_calls([call(1)], False)
- runCommand_mock.assert_has_calls([
- call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
- os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
- call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
- os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
- #retryable_command
- @not_for_platform(PLATFORM_LINUX)
- @patch("time.sleep")
- @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
- sleep_mock
- ):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- execution_result_fail_dict = {
- 'exitcode': 1,
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut': '',
- 'status': 'FAILED'
- }
- execution_result_succ_dict = {
- 'exitcode': 0,
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut': '',
- 'status': 'COMPLETED'
- }
- command = copy.deepcopy(self.retryable_command)
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
- actionQueue.execute_command(command)
- #assert that python executor start
- self.assertTrue(runCommand_mock.called)
- self.assertEqual(2, runCommand_mock.call_count)
- self.assertEqual(1, sleep_mock.call_count)
- sleep_mock.assert_any_call(2)
- @not_for_platform(PLATFORM_LINUX)
- @patch("time.sleep")
- @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock,
- sleep_mock
- ):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- execution_result_succ_dict = {
- 'exitcode': 0,
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut': '',
- 'status': 'COMPLETED'
- }
- command = copy.deepcopy(self.retryable_command)
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = [execution_result_succ_dict]
- actionQueue.execute_command(command)
- #assert that python executor start
- self.assertTrue(runCommand_mock.called)
- self.assertFalse(sleep_mock.called)
- self.assertEqual(1, runCommand_mock.call_count)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_background_command(self, CustomServiceOrchestrator_mock,
- runCommand_mock,
- ):
- CustomServiceOrchestrator_mock.return_value = None
- CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
- 'stdout': 'out-11',
- 'stderr' : 'err-13'}
-
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- execute_command = copy.deepcopy(self.background_command)
- actionQueue.put([execute_command])
- actionQueue.processBackgroundQueueSafeEmpty();
- actionQueue.controller.statusCommandExecutor.process_results();
-
- #assert that python execturor start
- self.assertTrue(runCommand_mock.called)
- runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
- self.assertTrue(runningCommand is not None)
- self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
-
- report = actionQueue.result()
- self.assertEqual(len(report['reports']),1)
- @patch.object(CustomServiceOrchestrator, "get_py_executor")
- @patch.object(CustomServiceOrchestrator, "resolve_script_path")
- def test_execute_python_executor(self, resolve_script_path_mock,
- get_py_executor_mock):
-
- dummy_controller = MagicMock()
- cfg = AmbariConfig()
- cfg.set('agent', 'tolerate_download_failures', 'true')
- cfg.set('agent', 'prefix', '.')
- cfg.set('agent', 'cache_dir', 'background_tasks')
-
- actionQueue = ActionQueue(cfg, dummy_controller)
- pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
- patch_output_file(pyex)
- get_py_executor_mock.return_value = pyex
- actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
-
- result = {}
- lock = threading.RLock()
- complete_done = threading.Condition(lock)
-
- def command_complete_w(process_condensed_result, handle):
- with lock:
- result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
- 'handle' : copy.copy(handle),
- 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
- }
- complete_done.notifyAll()
- actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
- None, command_complete_w)
- actionQueue.put([self.background_command])
- actionQueue.processBackgroundQueueSafeEmpty();
- actionQueue.controller.statusCommandExecutor.process_results();
-
- with lock:
- complete_done.wait(0.1)
-
- finished_status = result['command_complete']['command_status']
- self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
- self.assertEqual(finished_status['stdout'], 'process_out')
- self.assertEqual(finished_status['stderr'], 'process_err')
- self.assertEqual(finished_status['exitCode'], 0)
-
-
- runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
- self.assertTrue(runningCommand is not None)
-
- report = actionQueue.result()
- self.assertEqual(len(report['reports']),1)
- self.assertEqual(report['reports'][0]['stdout'],'process_out')
- # self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
-
-
-
- cancel_background_command = {
- "commandType":"CANCEL_COMMAND",
- "role":"AMBARI_SERVER_ACTION",
- "roleCommand":"ABORT",
- "commandId":"2--1",
- "taskId":20,
- "clusterName":"c1",
- "serviceName":"",
- "hostname":"c6401",
- "roleParams":{
- "cancelTaskIdTargets":"13,14"
- },
- }
- def patch_output_file(pythonExecutor):
- def windows_py(command, tmpout, tmperr):
- proc = MagicMock()
- proc.pid = 33
- proc.returncode = 0
- with tmpout:
- tmpout.write('process_out')
- with tmperr:
- tmperr.write('process_err')
- return proc
- def open_subprocess_files_win(fout, ferr, f):
- return MagicMock(), MagicMock()
- def read_result_from_files(out_path, err_path, structured_out_path):
- return 'process_out', 'process_err', '{"a": "b."}'
- pythonExecutor.launch_python_subprocess = windows_py
- pythonExecutor.open_subprocess_files = open_subprocess_files_win
- pythonExecutor.read_result_from_files = read_result_from_files
- def wraped(func, before = None, after = None):
- def wrapper(*args, **kwargs):
- if(before is not None):
- before(*args, **kwargs)
- ret = func(*args, **kwargs)
- if(after is not None):
- after(*args, **kwargs)
- return ret
- return wrapper
-
|