#!/usr/bin/env python ''' Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' from Queue import Queue from unittest import TestCase from ambari_agent.LiveStatus import LiveStatus from ambari_agent.ActionQueue import ActionQueue from ambari_agent.AmbariConfig import AmbariConfig import os, errno, time, pprint, tempfile, threading import sys from threading import Thread import copy import signal from mock.mock import patch, MagicMock, call from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.PythonExecutor import PythonExecutor from ambari_agent.ActualConfigHandler import ActualConfigHandler from ambari_agent.RecoveryManager import RecoveryManager from ambari_commons import OSCheck from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX import logging class TestActionQueue(TestCase): def setUp(self): # save original open() method for later use self.original_open = open def tearDown(self): sys.stdout = sys.__stdout__ logger = logging.getLogger() datanode_install_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 3, 'clusterName': u'cc', 'serviceName': u'HDFS', 'hostLevelParams': {}, 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v1' }}, 'commandParams': { 'command_retry_enabled': 'true' } } datanode_install_no_retry_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 3, 'clusterName': u'cc', 'serviceName': u'HDFS', 'hostLevelParams': {}, 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v1' }}, 'commandParams': { 'command_retry_enabled': 'false' } } datanode_auto_start_command = { 'commandType': 'AUTO_EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'START', 'commandId': '1-1', 'taskId': 3, 'clusterName': u'cc', 'serviceName': u'HDFS', 'hostLevelParams': {}, 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v1' }} } datanode_upgrade_command = { 'commandId': 17, 'role' : "role", 'taskId' : "taskId", 'clusterName' : "clusterName", 'serviceName' : "serviceName", 'roleCommand' : 'UPGRADE', 'hostname' : "localhost.localdomain", 'hostLevelParams': {}, 'clusterHostInfo': "clusterHostInfo", 'commandType': "EXECUTION_COMMAND", 'configurations':{'global' : {}}, 'roleParams': {}, 'commandParams' : { 'source_stack_version' : 'HDP-1.2.1', 'target_stack_version' : 'HDP-1.3.0' } } namenode_install_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'NAMENODE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 4, 'clusterName': u'cc', 'serviceName': u'HDFS', 'hostLevelParams': {} } snamenode_install_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'SECONDARY_NAMENODE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 5, 'clusterName': u'cc', 'serviceName': u'HDFS', 'hostLevelParams': {} } hbase_install_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'HBASE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 7, 'clusterName': u'cc', 'serviceName': u'HDFS', 'hostLevelParams': {}, 'commandParams': { 'command_retry_enabled': 'true' } } status_command = { "serviceName" : 'HDFS', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "DATANODE", 'configurations':{}, 'hostLevelParams': {} } datanode_restart_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'CUSTOM_COMMAND', 'commandId': '1-1', 'taskId': 9, 'clusterName': u'cc', 'serviceName': u'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v123' }}, 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []} } datanode_restart_command_no_logging = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'CUSTOM_COMMAND', 'commandId': '1-1', 'taskId': 9, 'clusterName': u'cc', 'serviceName': u'HDFS', 'configurations': {'global': {}}, 'configurationTags': {'global': {'tag': 'v123'}}, 'commandParams': { 'log_output': 'false' }, 'hostLevelParams': {'custom_command': 'RESTART', 'clientsToUpdateConfigs': []} } datanode_restart_command_no_clients_update = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'CUSTOM_COMMAND', 'commandId': '1-1', 'taskId': 9, 'clusterName': u'cc', 'serviceName': u'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v123' }}, 'hostLevelParams':{'custom_command': 'RESTART'} } datanode_start_custom_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'CUSTOM_COMMAND', 'commandId': '1-1', 'taskId': 9, 'clusterName': u'cc', 'serviceName': u'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v123' }}, 'hostLevelParams':{'custom_command': 'START'} } yarn_refresh_queues_custom_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'RESOURCEMANAGER', 'roleCommand': u'CUSTOM_COMMAND', 'commandId': '1-1', 'taskId': 9, 'clusterName': u'cc', 'serviceName': u'YARN', 'commandParams' : {'forceRefreshConfigTags' : 'capacity-scheduler'}, 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v123' }, 'capacity-scheduler' : {'tag': 'v123'}}, 'hostLevelParams':{'custom_command': 'REFRESHQUEUES'} } status_command_for_alerts = { "serviceName" : 'FLUME', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "FLUME_HANDLER", 'configurations':{}, 'hostLevelParams': {} } retryable_command = { 'commandType': 'EXECUTION_COMMAND', 'role': 'NAMENODE', 'roleCommand': 'INSTALL', 'commandId': '1-1', 'taskId': 19, 'clusterName': 'c1', 'serviceName': 'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v123' }}, 'commandParams' : { 'script_type' : 'PYTHON', 'script' : 'script.py', 'command_timeout' : '600', 'jdk_location' : '.', 'service_package_folder' : '.', 'command_retry_enabled' : 'true', 'max_duration_for_retries' : '5' }, 'hostLevelParams' : {} } background_command = { 'commandType': 'BACKGROUND_EXECUTION_COMMAND', 'role': 'NAMENODE', 'roleCommand': 'CUSTOM_COMMAND', 'commandId': '1-1', 'taskId': 19, 'clusterName': 'c1', 'serviceName': 'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v123' }}, 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'}, 'commandParams' : { 'script_type' : 'PYTHON', 'script' : 'script.py', 'command_timeout' : '600', 'jdk_location' : '.', 'service_package_folder' : '.' } } cancel_background_command = { 'commandType': 'EXECUTION_COMMAND', 'role': 'NAMENODE', 'roleCommand': 'ACTIONEXECUTE', 'commandId': '1-1', 'taskId': 20, 'clusterName': 'c1', 'serviceName': 'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : {}}, 'hostLevelParams':{}, 'commandParams' : { 'script_type' : 'PYTHON', 'script' : 'cancel_background_task.py', 'before_system_hook_function' : 'fetch_bg_pid_by_taskid', 'jdk_location' : '.', 'command_timeout' : '600', 'service_package_folder' : '.', 'cancel_policy': 'SIGKILL', 'cancel_task_id': "19", } } @patch.object(AmbariConfig, "get_parallel_exec_option") @patch.object(ActionQueue, "process_command") @patch.object(Queue, "get") @patch.object(CustomServiceOrchestrator, "__init__") def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock, get_mock, process_command_mock, get_parallel_exec_option_mock): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() config = MagicMock() get_parallel_exec_option_mock.return_value = 0 config.get_parallel_exec_option = get_parallel_exec_option_mock actionQueue = ActionQueue(config, dummy_controller) actionQueue.start() time.sleep(0.1) actionQueue.stop() actionQueue.join() self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') self.assertTrue(process_command_mock.call_count > 1) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("logging.RootLogger.exception") @patch.object(ActionQueue, "execute_command") def test_process_command(self, execute_command_mock, log_exc_mock): dummy_controller = MagicMock() config = AmbariConfig() config.set('agent', 'tolerate_download_failures', "true") actionQueue = ActionQueue(config, dummy_controller) execution_command = { 'commandType' : ActionQueue.EXECUTION_COMMAND, } status_command = { 'commandType' : ActionQueue.STATUS_COMMAND, } wrong_command = { 'commandType' : "SOME_WRONG_COMMAND", } # Try wrong command actionQueue.process_command(wrong_command) self.assertFalse(execute_command_mock.called) self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() log_exc_mock.reset_mock() # Try normal execution actionQueue.process_command(execution_command) self.assertTrue(execute_command_mock.called) self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() log_exc_mock.reset_mock() execute_command_mock.reset_mock() log_exc_mock.reset_mock() # Try exception to check proper logging def side_effect(self): raise Exception("TerribleException") execute_command_mock.side_effect = side_effect actionQueue.process_command(execution_command) self.assertTrue(log_exc_mock.called) log_exc_mock.reset_mock() actionQueue.process_command(execution_command) self.assertTrue(log_exc_mock.called) @patch.object(ActionQueue, "log_command_output") @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback") def test_log_execution_commands(self, status_update_callback_mock, command_status_dict_mock, cso_runCommand_mock, mock_log_command_output): custom_service_orchestrator_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '', 'exitcode' : 0 } cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") config.set('logging', 'log_command_executes', 1) dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) actionQueue.execute_command(self.datanode_restart_command) report = actionQueue.result() expected = {'status': 'COMPLETED', 'configurationTags': {'global': {'tag': 'v123'}}, 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'CUSTOM_COMMAND', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 9, 'customCommand': 'RESTART', 'exitCode': 0} # Agent caches configurationTags if custom_command RESTART completed mock_log_command_output.assert_has_calls([call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True) self.assertEqual(len(report['reports']), 1) self.assertEqual(expected, report['reports'][0]) @patch.object(ActionQueue, "log_command_output") @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback") def test_do_not_log_execution_commands(self, status_update_callback_mock, command_status_dict_mock, cso_runCommand_mock, mock_log_command_output): custom_service_orchestrator_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut': '', 'exitcode': 0 } cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") config.set('logging', 'log_command_executes', 1) dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) actionQueue.execute_command(self.datanode_restart_command_no_logging) report = actionQueue.result() expected = {'status': 'COMPLETED', 'configurationTags': {'global': {'tag': 'v123'}}, 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'CUSTOM_COMMAND', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 9, 'customCommand': 'RESTART', 'exitCode': 0} # Agent caches configurationTags if custom_command RESTART completed mock_log_command_output.assert_not_called( [call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True) self.assertEqual(len(report['reports']), 1) self.assertEqual(expected, report['reports'][0]) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("__builtin__.open") @patch.object(ActionQueue, "status_update_callback") def test_auto_execute_command(self, status_update_callback_mock, open_mock): # Make file read calls visible def open_side_effect(file, mode): if mode == 'r': file_mock = MagicMock() file_mock.read.return_value = "Read from " + str(file) return file_mock else: return self.original_open(file, mode) open_mock.side_effect = open_side_effect config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "", -1) actionQueue = ActionQueue(config, dummy_controller) unfreeze_flag = threading.Event() python_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '' } def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): unfreeze_flag.wait() return python_execution_result_dict def patched_aq_execute_command(command): # We have to perform patching for separate thread in the same thread with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: runCommand_mock.side_effect = side_effect actionQueue.process_command(command) python_execution_result_dict['status'] = 'COMPLETE' python_execution_result_dict['exitcode'] = 0 self.assertFalse(actionQueue.tasks_in_progress_or_pending()) # We call method in a separate thread execution_thread = Thread(target = patched_aq_execute_command , args = (self.datanode_auto_start_command, )) execution_thread.start() # check in progress report # wait until ready while True: time.sleep(0.1) if actionQueue.tasks_in_progress_or_pending(): break # Continue command execution unfreeze_flag.set() # wait until ready check_queue = True while check_queue: report = actionQueue.result() if not actionQueue.tasks_in_progress_or_pending(): break time.sleep(0.1) self.assertEqual(len(report['reports']), 0) ## Test failed execution python_execution_result_dict['status'] = 'FAILED' python_execution_result_dict['exitcode'] = 13 # We call method in a separate thread execution_thread = Thread(target = patched_aq_execute_command , args = (self.datanode_auto_start_command, )) execution_thread.start() unfreeze_flag.set() # check in progress report # wait until ready while check_queue: report = actionQueue.result() if not actionQueue.tasks_in_progress_or_pending(): break time.sleep(0.1) self.assertEqual(len(report['reports']), 0) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("__builtin__.open") @patch.object(ActionQueue, "status_update_callback") def test_execute_command(self, status_update_callback_mock, open_mock): # Make file read calls visible def open_side_effect(file, mode): if mode == 'r': file_mock = MagicMock() file_mock.read.return_value = "Read from " + str(file) return file_mock else: return self.original_open(file, mode) open_mock.side_effect = open_side_effect config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) unfreeze_flag = threading.Event() python_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '' } def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): unfreeze_flag.wait() return python_execution_result_dict def patched_aq_execute_command(command): # We have to perform patching for separate thread in the same thread with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: runCommand_mock.side_effect = side_effect actionQueue.execute_command(command) ### Test install/start/stop command ### ## Test successful execution with configuration tags python_execution_result_dict['status'] = 'COMPLETE' python_execution_result_dict['exitcode'] = 0 # We call method in a separate thread execution_thread = Thread(target = patched_aq_execute_command , args = (self.datanode_install_command, )) execution_thread.start() # check in progress report # wait until ready while True: time.sleep(0.1) report = actionQueue.result() if len(report['reports']) != 0: break expected = {'status': 'IN_PROGRESS', 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")), 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")), 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")), 'clusterName': u'cc', 'roleCommand': u'INSTALL', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 3, 'exitCode': 777} self.assertEqual(report['reports'][0], expected) self.assertTrue(actionQueue.tasks_in_progress_or_pending()) # Continue command execution unfreeze_flag.set() # wait until ready while report['reports'][0]['status'] == 'IN_PROGRESS': time.sleep(0.1) report = actionQueue.result() # check report configname = os.path.join(tempdir, 'config.json') expected = {'status': 'COMPLETED', 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'INSTALL', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 3, 'configurationTags': {'global': {'tag': 'v1'}}, 'exitCode': 0} self.assertEqual(len(report['reports']), 1) self.assertEqual(report['reports'][0], expected) self.assertTrue(os.path.isfile(configname)) # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE) self.assertEqual(status_update_callback_mock.call_count, 2) os.remove(configname) # now should not have reports (read complete/failed reports are deleted) report = actionQueue.result() self.assertEqual(len(report['reports']), 0) ## Test failed execution python_execution_result_dict['status'] = 'FAILED' python_execution_result_dict['exitcode'] = 13 # We call method in a separate thread execution_thread = Thread(target = patched_aq_execute_command , args = (self.datanode_install_command, )) execution_thread.start() unfreeze_flag.set() # check in progress report # wait until ready report = actionQueue.result() while len(report['reports']) == 0 or \ report['reports'][0]['status'] == 'IN_PROGRESS': time.sleep(0.1) report = actionQueue.result() # check report expected = {'status': 'FAILED', 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'INSTALL', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 3, 'exitCode': 13} self.assertEqual(len(report['reports']), 1) self.assertEqual(report['reports'][0], expected) # now should not have reports (read complete/failed reports are deleted) report = actionQueue.result() self.assertEqual(len(report['reports']), 0) ### Test upgrade command ### python_execution_result_dict['status'] = 'COMPLETE' python_execution_result_dict['exitcode'] = 0 execution_thread = Thread(target = patched_aq_execute_command , args = (self.datanode_upgrade_command, )) execution_thread.start() unfreeze_flag.set() # wait until ready report = actionQueue.result() while len(report['reports']) == 0 or \ report['reports'][0]['status'] == 'IN_PROGRESS': time.sleep(0.1) report = actionQueue.result() # check report expected = {'status': 'COMPLETED', 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n\n\nCommand completed successfully!\n', 'clusterName': 'clusterName', 'structuredOut': '""', 'roleCommand': 'UPGRADE', 'serviceName': 'serviceName', 'role': 'role', 'actionId': 17, 'taskId': 'taskId', 'exitCode': 0} self.assertEqual(len(report['reports']), 1) self.assertEqual(report['reports'][0], expected) # now should not have reports (read complete/failed reports are deleted) report = actionQueue.result() self.assertEqual(len(report['reports']), 0) def test_cancel_with_reschedule_command(self): config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) unfreeze_flag = threading.Event() python_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '', 'status' : '', 'exitcode' : -signal.SIGTERM } def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): unfreeze_flag.wait() return python_execution_result_dict def patched_aq_execute_command(command): # We have to perform patching for separate thread in the same thread with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: runCommand_mock.side_effect = side_effect actionQueue.execute_command(command) # We call method in a separate thread execution_thread = Thread(target = patched_aq_execute_command , args = (self.datanode_install_command, )) execution_thread.start() # check in progress report # wait until ready while True: time.sleep(0.1) report = actionQueue.result() if len(report['reports']) != 0: break unfreeze_flag.set() # wait until ready while len(report['reports']) != 0: time.sleep(0.1) report = actionQueue.result() # check report self.assertEqual(len(report['reports']), 0) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback") def test_store_configuration_tags(self, status_update_callback_mock, command_status_dict_mock, cso_runCommand_mock): custom_service_orchestrator_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '', 'exitcode' : 0 } cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) actionQueue.execute_command(self.datanode_restart_command) report = actionQueue.result() expected = {'status': 'COMPLETED', 'configurationTags': {'global': {'tag': 'v123'}}, 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'CUSTOM_COMMAND', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 9, 'customCommand': 'RESTART', 'exitCode': 0} # Agent caches configurationTags if custom_command RESTART completed self.assertEqual(len(report['reports']), 1) self.assertEqual(expected, report['reports'][0]) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActualConfigHandler, "write_client_components") @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback") def test_store_configuration_tags_no_clients(self, status_update_callback_mock, command_status_dict_mock, cso_runCommand_mock, write_client_components_mock): custom_service_orchestrator_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '', 'exitcode' : 0 } cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) actionQueue.execute_command(self.datanode_restart_command_no_clients_update) report = actionQueue.result() expected = {'status': 'COMPLETED', 'configurationTags': {'global': {'tag': 'v123'}}, 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'CUSTOM_COMMAND', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 9, 'customCommand': 'RESTART', 'exitCode': 0} # Agent caches configurationTags if custom_command RESTART completed self.assertEqual(len(report['reports']), 1) self.assertEqual(expected, report['reports'][0]) self.assertFalse(write_client_components_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActualConfigHandler, "write_client_components") @patch.object(ActualConfigHandler, "write_actual_component") @patch.object(ActualConfigHandler, "update_component_tag") @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback") def test_refresh_queues_custom_command(self, status_update_callback_mock, command_status_dict_mock, cso_runCommand_mock, update_component_tag, write_actual_component_mock, write_client_components_mock): custom_service_orchestrator_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '', 'exitcode' : 0 } cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) actionQueue.execute_command(self.yarn_refresh_queues_custom_command) report = actionQueue.result() expected = {'status': 'COMPLETED', 'configurationTags': None, 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'CUSTOM_COMMAND', 'serviceName': u'YARN', 'role': u'RESOURCEMANAGER', 'actionId': '1-1', 'taskId': 9, 'customCommand': 'RESTART', 'exitCode': 0} self.assertEqual(len(report['reports']), 1) self.assertEqual(expected, report['reports'][0]) # Configuration tags should be updated self.assertTrue(update_component_tag.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActualConfigHandler, "write_client_components") @patch.object(ActualConfigHandler, "write_actual_component") @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback") def test_store_configuration_tags_on_custom_start_command(self, status_update_callback_mock, command_status_dict_mock, cso_runCommand_mock, write_actual_component_mock, write_client_components_mock): custom_service_orchestrator_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '', 'exitcode' : 0 } cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) actionQueue.execute_command(self.datanode_start_custom_command) report = actionQueue.result() expected = {'status': 'COMPLETED', 'configurationTags': {'global': {'tag': 'v123'}}, 'stderr': 'stderr', 'stdout': 'out\n\nCommand completed successfully!\n', 'clusterName': u'cc', 'structuredOut': '""', 'roleCommand': u'CUSTOM_COMMAND', 'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1', 'taskId': 9, 'customCommand': 'START', 'exitCode': 0} self.assertEqual(len(report['reports']), 1) self.assertEqual(expected, report['reports'][0]) # Configuration tags should be updated on custom start command self.assertTrue(write_actual_component_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActualConfigHandler, "write_actual_component") @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback") def test_store_config_tags_on_install_client_command(self, status_update_callback_mock, command_status_dict_mock, cso_runCommand_mock, write_actual_component_mock): custom_service_orchestrator_execution_result_dict = { 'stdout': 'out', 'stderr': 'stderr', 'structuredOut' : '', 'exitcode' : 0 } cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict tez_client_install_command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'TEZ_CLIENT', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 9, 'clusterName': u'cc', 'serviceName': u'TEZ', 'configurations': {'global' : {}}, 'configurationTags': {'global' : { 'tag': 'v123' }}, 'hostLevelParams': {} } LiveStatus.CLIENT_COMPONENTS = ({'serviceName': 'TEZ', 'componentName': 'TEZ_CLIENT'}, ) config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") config.set('agent', 'tolerate_download_failures', "true") dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) actionQueue.execute_command(tez_client_install_command) # Configuration tags should be updated on install client command self.assertTrue(write_actual_component_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_status_command(self, CustomServiceOrchestrator_mock, build_mock, execute_command_mock, status_update_callback): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) build_mock.return_value = {'dummy report': '' } dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') actionQueue.process_status_command_result(result) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN'} self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) @patch.object(RecoveryManager, "command_exists") @patch.object(RecoveryManager, "requires_recovery") @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock, build_mock, execute_command_mock, status_update_callback, requires_recovery_mock, command_exists_mock): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) build_mock.return_value = {'dummy report': '' } requires_recovery_mock.return_value = True command_exists_mock.return_value = False dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False) result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') actionQueue.process_status_command_result(result) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN', 'sendExecCmdDet': 'True'} self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) requires_recovery_mock.return_value = True command_exists_mock.return_value = True result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') actionQueue.process_status_command_result(result) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN', 'sendExecCmdDet': 'False'} self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock, build_mock, execute_command_mock, status_update_callback): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) command_return_value = { 'exitcode': 0, 'stdout': 'out', 'stderr': 'err', 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] } } result = (self.status_command_for_alerts, command_return_value, command_return_value) build_mock.return_value = {'somestatusresult': 'aresult'} actionQueue.process_status_command_result(result) report = actionQueue.result() self.assertEqual(len(report['componentStatus']), 1) self.assertTrue(report['componentStatus'][0].has_key('alerts')) @patch.object(AmbariConfig, "get_parallel_exec_option") @patch.object(ActionQueue, "process_command") @patch.object(Queue, "get") @patch.object(CustomServiceOrchestrator, "__init__") def test_reset_queue(self, CustomServiceOrchestrator_mock, get_mock, process_command_mock, gpeo_mock): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) config = MagicMock() gpeo_mock.return_value = 0 config.get_parallel_exec_option = gpeo_mock actionQueue = ActionQueue(config, dummy_controller) actionQueue.start() actionQueue.put([self.datanode_install_command, self.hbase_install_command]) self.assertEqual(2, actionQueue.commandQueue.qsize()) self.assertTrue(actionQueue.tasks_in_progress_or_pending()) actionQueue.reset() self.assertTrue(actionQueue.commandQueue.empty()) self.assertFalse(actionQueue.tasks_in_progress_or_pending()) time.sleep(0.1) actionQueue.stop() actionQueue.join() self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') @patch.object(AmbariConfig, "get_parallel_exec_option") @patch.object(ActionQueue, "process_command") @patch.object(Queue, "get") @patch.object(CustomServiceOrchestrator, "__init__") def test_cancel(self, CustomServiceOrchestrator_mock, get_mock, process_command_mock, gpeo_mock): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() config = MagicMock() gpeo_mock.return_value = 0 config.get_parallel_exec_option = gpeo_mock actionQueue = ActionQueue(config, dummy_controller) actionQueue.start() actionQueue.put([self.datanode_install_command, self.hbase_install_command]) self.assertEqual(2, actionQueue.commandQueue.qsize()) actionQueue.reset() self.assertTrue(actionQueue.commandQueue.empty()) time.sleep(0.1) actionQueue.stop() actionQueue.join() self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') @patch.object(AmbariConfig, "get_parallel_exec_option") @patch.object(ActionQueue, "process_command") @patch.object(CustomServiceOrchestrator, "__init__") def test_parallel_exec(self, CustomServiceOrchestrator_mock, process_command_mock, gpeo_mock): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() config = MagicMock() gpeo_mock.return_value = 1 config.get_parallel_exec_option = gpeo_mock actionQueue = ActionQueue(config, dummy_controller) actionQueue.put([self.datanode_install_command, self.hbase_install_command]) self.assertEqual(2, actionQueue.commandQueue.qsize()) actionQueue.start() time.sleep(1) actionQueue.stop() actionQueue.join() self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') self.assertEqual(2, process_command_mock.call_count) process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)]) @patch("threading.Thread") @patch.object(AmbariConfig, "get_parallel_exec_option") @patch.object(ActionQueue, "process_command") @patch.object(CustomServiceOrchestrator, "__init__") def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock, process_command_mock, gpeo_mock, threading_mock): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() config = MagicMock() gpeo_mock.return_value = 1 config.get_parallel_exec_option = gpeo_mock actionQueue = ActionQueue(config, dummy_controller) actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command]) self.assertEqual(2, actionQueue.commandQueue.qsize()) actionQueue.start() time.sleep(1) actionQueue.stop() actionQueue.join() self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') self.assertEqual(2, process_command_mock.call_count) self.assertEqual(0, threading_mock.call_count) process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)]) @not_for_platform(PLATFORM_LINUX) @patch("time.sleep") @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_retryable_command(self, CustomServiceOrchestrator_mock, sleep_mock ): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) python_execution_result_dict = { 'exitcode': 1, 'stdout': 'out', 'stderr': 'stderr', 'structuredOut': '', 'status': 'FAILED' } def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): return python_execution_result_dict command = copy.deepcopy(self.retryable_command) with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: runCommand_mock.side_effect = side_effect actionQueue.execute_command(command) #assert that python executor start self.assertTrue(runCommand_mock.called) self.assertEqual(3, runCommand_mock.call_count) self.assertEqual(2, sleep_mock.call_count) sleep_mock.assert_has_calls([call(2), call(3)], False) runCommand_mock.assert_has_calls([ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False), call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True), call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)]) @patch("time.time") @patch("time.sleep") @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock, sleep_mock, time_mock ): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) actionQueue = ActionQueue(AmbariConfig(), dummy_controller) python_execution_result_dict = { 'exitcode': 1, 'stdout': 'out', 'stderr': 'stderr', 'structuredOut': '', 'status': 'FAILED' } times_arr = [8, 10, 14, 18, 22, 26, 30, 34] if self.logger.isEnabledFor(logging.INFO): times_arr.insert(0, 4) time_mock.side_effect = times_arr def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False): return python_execution_result_dict command = copy.deepcopy(self.retryable_command) with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: runCommand_mock.side_effect = side_effect actionQueue.execute_command(command) #assert that python executor start self.assertTrue(runCommand_mock.called) self.assertEqual(2, runCommand_mock.call_count) self.assertEqual(1, sleep_mock.call_count) sleep_mock.assert_has_calls([call(1)], False) runCommand_mock.assert_has_calls([ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False), call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)]) #retryable_command @not_for_platform(PLATFORM_LINUX) @patch("time.sleep") @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock, sleep_mock ): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) execution_result_fail_dict = { 'exitcode': 1, 'stdout': 'out', 'stderr': 'stderr', 'structuredOut': '', 'status': 'FAILED' } execution_result_succ_dict = { 'exitcode': 0, 'stdout': 'out', 'stderr': 'stderr', 'structuredOut': '', 'status': 'COMPLETED' } command = copy.deepcopy(self.retryable_command) with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict] actionQueue.execute_command(command) #assert that python executor start self.assertTrue(runCommand_mock.called) self.assertEqual(2, runCommand_mock.call_count) self.assertEqual(1, sleep_mock.call_count) sleep_mock.assert_any_call(2) @not_for_platform(PLATFORM_LINUX) @patch("time.sleep") @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value)) @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock, sleep_mock ): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) execution_result_succ_dict = { 'exitcode': 0, 'stdout': 'out', 'stderr': 'stderr', 'structuredOut': '', 'status': 'COMPLETED' } command = copy.deepcopy(self.retryable_command) with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock: runCommand_mock.side_effect = [execution_result_succ_dict] actionQueue.execute_command(command) #assert that python executor start self.assertTrue(runCommand_mock.called) self.assertFalse(sleep_mock.called) self.assertEqual(1, runCommand_mock.call_count) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(CustomServiceOrchestrator, "runCommand") @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_background_command(self, CustomServiceOrchestrator_mock, runCommand_mock, ): CustomServiceOrchestrator_mock.return_value = None CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0, 'stdout': 'out-11', 'stderr' : 'err-13'} dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) execute_command = copy.deepcopy(self.background_command) actionQueue.put([execute_command]) actionQueue.processBackgroundQueueSafeEmpty(); actionQueue.controller.statusCommandExecutor.process_results(); #assert that python execturor start self.assertTrue(runCommand_mock.called) runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId']) self.assertTrue(runningCommand is not None) self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS) report = actionQueue.result() self.assertEqual(len(report['reports']),1) @patch.object(CustomServiceOrchestrator, "get_py_executor") @patch.object(CustomServiceOrchestrator, "resolve_script_path") def test_execute_python_executor(self, resolve_script_path_mock, get_py_executor_mock): dummy_controller = MagicMock() cfg = AmbariConfig() cfg.set('agent', 'tolerate_download_failures', 'true') cfg.set('agent', 'prefix', '.') cfg.set('agent', 'cache_dir', 'background_tasks') actionQueue = ActionQueue(cfg, dummy_controller) pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config) patch_output_file(pyex) get_py_executor_mock.return_value = pyex actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock() result = {} lock = threading.RLock() complete_done = threading.Condition(lock) def command_complete_w(process_condensed_result, handle): with lock: result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result), 'handle' : copy.copy(handle), 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId']) } complete_done.notifyAll() actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback, None, command_complete_w) actionQueue.put([self.background_command]) actionQueue.processBackgroundQueueSafeEmpty(); actionQueue.controller.statusCommandExecutor.process_results(); with lock: complete_done.wait(0.1) finished_status = result['command_complete']['command_status'] self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS) self.assertEqual(finished_status['stdout'], 'process_out') self.assertEqual(finished_status['stderr'], 'process_err') self.assertEqual(finished_status['exitCode'], 0) runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId']) self.assertTrue(runningCommand is not None) report = actionQueue.result() self.assertEqual(len(report['reports']),1) self.assertEqual(report['reports'][0]['stdout'],'process_out') # self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}') cancel_background_command = { "commandType":"CANCEL_COMMAND", "role":"AMBARI_SERVER_ACTION", "roleCommand":"ABORT", "commandId":"2--1", "taskId":20, "clusterName":"c1", "serviceName":"", "hostname":"c6401", "roleParams":{ "cancelTaskIdTargets":"13,14" }, } def patch_output_file(pythonExecutor): def windows_py(command, tmpout, tmperr): proc = MagicMock() proc.pid = 33 proc.returncode = 0 with tmpout: tmpout.write('process_out') with tmperr: tmperr.write('process_err') return proc def open_subprocess_files_win(fout, ferr, f): return MagicMock(), MagicMock() def read_result_from_files(out_path, err_path, structured_out_path): return 'process_out', 'process_err', '{"a": "b."}' pythonExecutor.launch_python_subprocess = windows_py pythonExecutor.open_subprocess_files = open_subprocess_files_win pythonExecutor.read_result_from_files = read_result_from_files def wraped(func, before = None, after = None): def wrapper(*args, **kwargs): if(before is not None): before(*args, **kwargs) ret = func(*args, **kwargs) if(after is not None): after(*args, **kwargs) return ret return wrapper