#!/usr/bin/env python ''' Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' import ConfigParser from multiprocessing.pool import ThreadPool import os import pprint from ambari_commons import shell from unittest import TestCase import threading import tempfile import time from threading import Thread from mock.mock import MagicMock, patch import StringIO import sys from ambari_agent.ActionQueue import ActionQueue from ambari_agent.AgentException import AgentException from ambari_agent.AmbariConfig import AmbariConfig from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.FileCache import FileCache from ambari_agent.PythonExecutor import PythonExecutor from ambari_commons import OSCheck from only_for_platform import get_platform, os_distro_value, PLATFORM_WINDOWS class TestCustomServiceOrchestrator(TestCase): def setUp(self): # disable stdout out = StringIO.StringIO() sys.stdout = out # generate sample config tmpdir = tempfile.gettempdir() exec_tmp_dir = os.path.join(tmpdir, 'tmp') self.config = ConfigParser.RawConfigParser() self.config.get = AmbariConfig().get self.config.add_section('agent') self.config.set('agent', 'prefix', tmpdir) self.config.set('agent', 'cache_dir', "/cachedir") self.config.add_section('python') self.config.set('python', 'custom_actions_dir', tmpdir) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(FileCache, "__init__") def test_add_reg_listener_to_controller(self, FileCache_mock): FileCache_mock.return_value = None dummy_controller = MagicMock() config = AmbariConfig().getConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) CustomServiceOrchestrator(config, dummy_controller) self.assertTrue(dummy_controller.registration_listeners.append.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(CustomServiceOrchestrator, 'decompressClusterHostInfo') @patch("ambari_agent.hostname.public_hostname") @patch("os.path.isfile") @patch("os.unlink") @patch.object(FileCache, "__init__") def test_dump_command_to_json(self, FileCache_mock, unlink_mock, isfile_mock, hostname_mock, decompress_cluster_host_info_mock): FileCache_mock.return_value = None hostname_mock.return_value = "test.hst" command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 3, 'clusterName': u'cc', 'serviceName': u'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v1' }}, 'clusterHostInfo':{'namenode_host' : ['1'], 'slave_hosts' : ['0', '1'], 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_ping_ports': ['8670:0,1']}, 'hostLevelParams':{} } decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'], 'slave_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_ping_ports': ['8670', '8670']} config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(config, dummy_controller) isfile_mock.return_value = True # Test dumping EXECUTION_COMMAND json_file = orchestrator.dump_command_to_json(command) self.assertTrue(os.path.exists(json_file)) self.assertTrue(os.path.getsize(json_file) > 0) if get_platform() != PLATFORM_WINDOWS: self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') self.assertTrue(json_file.endswith("command-3.json")) self.assertTrue(decompress_cluster_host_info_mock.called) os.unlink(json_file) # Test dumping STATUS_COMMAND command['commandType']='STATUS_COMMAND' decompress_cluster_host_info_mock.reset_mock() json_file = orchestrator.dump_command_to_json(command) self.assertTrue(os.path.exists(json_file)) self.assertTrue(os.path.getsize(json_file) > 0) if get_platform() != PLATFORM_WINDOWS: self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') self.assertTrue(json_file.endswith("status_command.json")) self.assertFalse(decompress_cluster_host_info_mock.called) os.unlink(json_file) # Testing side effect of dump_command_to_json self.assertEquals(command['public_hostname'], "test.hst") self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0) self.assertTrue(unlink_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("ambari_agent.hostname.public_hostname") @patch("os.path.isfile") @patch("os.unlink") @patch.object(FileCache, "__init__") def test_dump_command_to_json_with_retry(self, FileCache_mock, unlink_mock, isfile_mock, hostname_mock): FileCache_mock.return_value = None hostname_mock.return_value = "test.hst" command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 3, 'clusterName': u'cc', 'serviceName': u'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v1' }}, 'clusterHostInfo':{'namenode_host' : ['1'], 'slave_hosts' : ['0', '1'], 'all_racks' : [u'/default-rack:0'], 'ambari_server_host' : 'a.b.c', 'ambari_server_port' : '123', 'ambari_server_use_ssl' : 'false', 'all_ipv4_ips' : [u'192.168.12.101:0'], 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_ping_ports': ['8670:0,1']}, 'hostLevelParams':{} } config = AmbariConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(config, dummy_controller) isfile_mock.return_value = True # Test dumping EXECUTION_COMMAND json_file = orchestrator.dump_command_to_json(command) self.assertTrue(os.path.exists(json_file)) self.assertTrue(os.path.getsize(json_file) > 0) if get_platform() != PLATFORM_WINDOWS: self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') self.assertTrue(json_file.endswith("command-3.json")) os.unlink(json_file) # Test dumping STATUS_COMMAND json_file = orchestrator.dump_command_to_json(command, True) self.assertTrue(os.path.exists(json_file)) self.assertTrue(os.path.getsize(json_file) > 0) if get_platform() != PLATFORM_WINDOWS: self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') self.assertTrue(json_file.endswith("command-3.json")) os.unlink(json_file) # Testing side effect of dump_command_to_json self.assertEquals(command['public_hostname'], "test.hst") self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0) self.assertTrue(unlink_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("os.path.exists") @patch.object(FileCache, "__init__") def test_resolve_script_path(self, FileCache_mock, exists_mock): FileCache_mock.return_value = None dummy_controller = MagicMock() config = AmbariConfig().getConfig() orchestrator = CustomServiceOrchestrator(config, dummy_controller) # Testing existing path exists_mock.return_value = True path = orchestrator.\ resolve_script_path(os.path.join("HBASE", "package"), os.path.join("scripts", "hbase_master.py")) self.assertEqual(os.path.join("HBASE", "package", "scripts", "hbase_master.py"), path) # Testing not existing path exists_mock.return_value = False try: orchestrator.resolve_script_path("/HBASE", os.path.join("scripts", "hbase_master.py")) self.fail('ExpectedException not thrown') except AgentException: pass # Expected @patch.object(FileCache, "get_dashboard_base_dir") @patch.object(CustomServiceOrchestrator, "resolve_script_path") @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") @patch.object(FileCache, "get_host_scripts_base_dir") @patch.object(FileCache, "get_service_base_dir") @patch.object(FileCache, "get_hook_base_dir") @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(PythonExecutor, "run_file") @patch.object(FileCache, "__init__") def test_runCommand(self, FileCache_mock, run_file_mock, dump_command_to_json_mock, get_hook_base_dir_mock, get_service_base_dir_mock, get_host_scripts_base_dir_mock, resolve_hook_script_path_mock, resolve_script_path_mock, get_dashboard_base_dir_mock): FileCache_mock.return_value = None command = { 'commandType' : 'EXECUTION_COMMAND', 'role' : 'REGION_SERVER', 'hostLevelParams' : { 'stack_name' : 'HDP', 'stack_version' : '2.0.7', 'jdk_location' : 'some_location' }, 'commandParams': { 'script_type': 'PYTHON', 'script': 'scripts/hbase_regionserver.py', 'command_timeout': '600', 'service_package_folder' : 'HBASE' }, 'taskId' : '3', 'roleCommand': 'INSTALL' } get_host_scripts_base_dir_mock.return_value = "/host_scripts" get_service_base_dir_mock.return_value = "/basedir/" resolve_script_path_mock.return_value = "/basedir/scriptpath" resolve_hook_script_path_mock.return_value = \ ('/hooks_dir/prefix-command/scripts/hook.py', '/hooks_dir/prefix-command') dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) unix_process_id = 111 orchestrator.commands_in_progress = {command['taskId']: unix_process_id} get_hook_base_dir_mock.return_value = "/hooks/" get_dashboard_base_dir_mock.return_value = "/dashboards/" # normal run case run_file_mock.return_value = { 'stdout' : 'sss', 'stderr' : 'eee', 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 0) self.assertTrue(run_file_mock.called) self.assertEqual(run_file_mock.call_count, 3) self.assertTrue(get_dashboard_base_dir_mock.called) run_file_mock.reset_mock() # Case when we force another command run_file_mock.return_value = { 'stdout' : 'sss', 'stderr' : 'eee', 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt", forced_command_name=CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON) ## Check that override_output_files was true only during first call print run_file_mock self.assertEquals(run_file_mock.call_args_list[0][0][8], True) self.assertEquals(run_file_mock.call_args_list[1][0][8], False) self.assertEquals(run_file_mock.call_args_list[2][0][8], False) ## Check that forced_command_name was taken into account self.assertEqual(run_file_mock.call_args_list[0][0][1][0], CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON) run_file_mock.reset_mock() # For role=METRICS_GRAFANA, dashboards should be sync'd command['role'] = 'METRICS_GRAFANA' get_dashboard_base_dir_mock.reset_mock() get_dashboard_base_dir_mock.return_value = "/dashboards/" run_file_mock.return_value = { 'stdout' : 'sss', 'stderr' : 'eee', 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 0) self.assertTrue(run_file_mock.called) self.assertEqual(run_file_mock.call_count, 3) self.assertTrue(get_dashboard_base_dir_mock.called) command['role'] = 'REGION_SERVER' run_file_mock.reset_mock() # unknown script type case command['commandParams']['script_type'] = "SOME_TYPE" ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 1) self.assertFalse(run_file_mock.called) self.assertTrue("Unknown script type" in ret['stdout']) #By default returns empty dictionary self.assertEqual(ret['structuredOut'], '{}') pass @patch.object(FileCache, "get_dashboard_base_dir") @patch("ambari_commons.shell.kill_process_with_children") @patch.object(CustomServiceOrchestrator, "resolve_script_path") @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") @patch.object(FileCache, "get_host_scripts_base_dir") @patch.object(FileCache, "get_service_base_dir") @patch.object(FileCache, "get_hook_base_dir") @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(PythonExecutor, "run_file") @patch.object(FileCache, "__init__") def test_cancel_command(self, FileCache_mock, run_file_mock, dump_command_to_json_mock, get_hook_base_dir_mock, get_service_base_dir_mock, get_host_scripts_base_dir_mock, resolve_hook_script_path_mock, resolve_script_path_mock, kill_process_with_children_mock, get_dashboard_base_dir_mock): FileCache_mock.return_value = None command = { 'role' : 'REGION_SERVER', 'hostLevelParams' : { 'stack_name' : 'HDP', 'stack_version' : '2.0.7', 'jdk_location' : 'some_location' }, 'commandParams': { 'script_type': 'PYTHON', 'script': 'scripts/hbase_regionserver.py', 'command_timeout': '600', 'service_package_folder' : 'HBASE' }, 'taskId' : '3', 'roleCommand': 'INSTALL' } get_host_scripts_base_dir_mock.return_value = "/host_scripts" get_service_base_dir_mock.return_value = "/basedir/" resolve_script_path_mock.return_value = "/basedir/scriptpath" resolve_hook_script_path_mock.return_value = \ ('/hooks_dir/prefix-command/scripts/hook.py', '/hooks_dir/prefix-command') dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) unix_process_id = 111 orchestrator.commands_in_progress = {command['taskId']: unix_process_id} get_hook_base_dir_mock.return_value = "/hooks/" get_dashboard_base_dir_mock.return_value = "/dashboards/" run_file_mock_return_value = { 'stdout' : 'killed', 'stderr' : 'killed', 'exitcode': 1, } def side_effect(*args, **kwargs): time.sleep(0.2) return run_file_mock_return_value run_file_mock.side_effect = side_effect _, out = tempfile.mkstemp() _, err = tempfile.mkstemp() pool = ThreadPool(processes=1) async_result = pool.apply_async(orchestrator.runCommand, (command, out, err)) time.sleep(0.1) orchestrator.cancel_command(command['taskId'], 'reason') ret = async_result.get() self.assertEqual(ret['exitcode'], 1) self.assertEquals(ret['stdout'], 'killed\nCommand aborted. Reason: \'reason\'') self.assertEquals(ret['stderr'], 'killed\nCommand aborted. Reason: \'reason\'') self.assertTrue(kill_process_with_children_mock.called) self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys()) self.assertTrue(os.path.exists(out)) self.assertTrue(os.path.exists(err)) try: os.remove(out) os.remove(err) except: pass @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(CustomServiceOrchestrator, "get_py_executor") @patch("ambari_commons.shell.kill_process_with_children") @patch.object(FileCache, "__init__") @patch.object(CustomServiceOrchestrator, "resolve_script_path") @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") def test_cancel_backgound_command(self, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock, get_py_executor_mock): FileCache_mock.return_value = None FileCache_mock.cache_dir = MagicMock() resolve_hook_script_path_mock.return_value = None dummy_controller = MagicMock() cfg = AmbariConfig() cfg.set('agent', 'tolerate_download_failures', 'true') cfg.set('agent', 'prefix', '.') cfg.set('agent', 'cache_dir', 'background_tasks') actionQueue = ActionQueue(cfg, dummy_controller) dummy_controller.actionQueue = actionQueue orchestrator = CustomServiceOrchestrator(cfg, dummy_controller) orchestrator.file_cache = MagicMock() def f (a, b): return "" orchestrator.file_cache.get_service_base_dir = f actionQueue.customServiceOrchestrator = orchestrator import TestActionQueue import copy pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config) TestActionQueue.patch_output_file(pyex) pyex.prepare_process_result = MagicMock() get_py_executor_mock.return_value = pyex orchestrator.dump_command_to_json = MagicMock() lock = threading.RLock() complete_done = threading.Condition(lock) complete_was_called = {} def command_complete_w(process_condenced_result, handle): with lock: complete_was_called['visited']= '' complete_done.wait(3) actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None) execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command) actionQueue.put([execute_command]) actionQueue.processBackgroundQueueSafeEmpty() time.sleep(.1) orchestrator.cancel_command(19,'reason') self.assertTrue(kill_process_with_children_mock.called) kill_process_with_children_mock.assert_called_with(33) with lock: complete_done.notifyAll() with lock: self.assertTrue(complete_was_called.has_key('visited')) time.sleep(.1) runningCommand = actionQueue.commandStatuses.get_command_status(19) self.assertTrue(runningCommand is not None) self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS) @patch.object(AmbariConfig, "get") @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(PythonExecutor, "run_file") @patch.object(FileCache, "__init__") @patch.object(FileCache, "get_custom_actions_base_dir") def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock, FileCache_mock, run_file_mock, dump_command_to_json_mock, ambari_config_get): ambari_config_get.return_value = "0" FileCache_mock.return_value = None get_custom_actions_base_dir_mock.return_value = "some path" _, script = tempfile.mkstemp() command = { 'role' : 'any', 'commandParams': { 'script_type': 'PYTHON', 'script': 'some_custom_action.py', 'command_timeout': '600', 'jdk_location' : 'some_location' }, 'taskId' : '3', 'roleCommand': 'ACTIONEXECUTE' } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) unix_process_id = 111 orchestrator.commands_in_progress = {command['taskId']: unix_process_id} # normal run case run_file_mock.return_value = { 'stdout' : 'sss', 'stderr' : 'eee', 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 0) self.assertTrue(run_file_mock.called) # Hoooks are not supported for custom actions, # that's why run_file() should be called only once self.assertEqual(run_file_mock.call_count, 1) @patch("os.path.isfile") @patch.object(FileCache, "__init__") def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock): FileCache_mock.return_value = None dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) # Testing None param res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command", "script_type") self.assertEqual(res1, None) # Testing existing hook script isfile_mock.return_value = True res2 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command", "script_type") self.assertEqual(res2, (os.path.join('hooks_dir', 'prefix-command', 'scripts', 'hook.py'), os.path.join('hooks_dir', 'prefix-command'))) # Testing not existing hook script isfile_mock.return_value = False res3 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command", "script_type") self.assertEqual(res3, None) @patch.object(CustomServiceOrchestrator, "runCommand") @patch.object(FileCache, "__init__") def test_requestComponentStatus(self, FileCache_mock, runCommand_mock): FileCache_mock.return_value = None status_command = { "serviceName" : 'HDFS', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "DATANODE", 'configurations':{} } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) # Test alive case runCommand_mock.return_value = { "exitcode" : 0 } status = orchestrator.requestComponentStatus(status_command) self.assertEqual(runCommand_mock.return_value, status) # Test dead case runCommand_mock.return_value = { "exitcode" : 1 } status = orchestrator.requestComponentStatus(status_command) self.assertEqual(runCommand_mock.return_value, status) @patch.object(CustomServiceOrchestrator, "runCommand") @patch.object(FileCache, "__init__") def test_requestComponentSecurityState(self, FileCache_mock, runCommand_mock): FileCache_mock.return_value = None status_command = { "serviceName" : 'HDFS', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "DATANODE", 'configurations':{} } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) # Test securityState runCommand_mock.return_value = { 'exitcode' : 0, 'structuredOut' : {'securityState': 'UNSECURED'} } status = orchestrator.requestComponentSecurityState(status_command) self.assertEqual('UNSECURED', status) # Test case where exit code indicates failure runCommand_mock.return_value = { "exitcode" : 1 } status = orchestrator.requestComponentSecurityState(status_command) self.assertEqual('UNKNOWN', status) @patch.object(FileCache, "__init__") def test_requestComponentSecurityState_realFailure(self, FileCache_mock): ''' Tests the case where the CustomServiceOrchestrator attempts to call a service's security_status method, but fails to do so because the script or method was not found. :param FileCache_mock: :return: ''' FileCache_mock.return_value = None status_command = { "serviceName" : 'BOGUS_SERVICE', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "DATANODE", 'configurations':{} } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) status = orchestrator.requestComponentSecurityState(status_command) self.assertEqual('UNKNOWN', status) @patch.object(CustomServiceOrchestrator, "get_py_executor") @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(FileCache, "__init__") @patch.object(FileCache, "get_custom_actions_base_dir") def test_runCommand_background_action(self, get_custom_actions_base_dir_mock, FileCache_mock, dump_command_to_json_mock, get_py_executor_mock): FileCache_mock.return_value = None get_custom_actions_base_dir_mock.return_value = "some path" _, script = tempfile.mkstemp() command = { 'role' : 'any', 'commandParams': { 'script_type': 'PYTHON', 'script': 'some_custom_action.py', 'command_timeout': '600', 'jdk_location' : 'some_location' }, 'taskId' : '13', 'roleCommand': 'ACTIONEXECUTE', 'commandType': 'BACKGROUND_EXECUTION_COMMAND', '__handle': BackgroundCommandExecutionHandle({'taskId': '13'}, 13, MagicMock(), MagicMock()) } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) import TestActionQueue pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config) TestActionQueue.patch_output_file(pyex) pyex.condenseOutput = MagicMock() get_py_executor_mock.return_value = pyex orchestrator.dump_command_to_json = MagicMock() ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 777) def tearDown(self): # enable stdout sys.stdout = sys.__stdout__