#!/usr/bin/env python ''' Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' import ConfigParser from multiprocessing.pool import ThreadPool import os import pprint from ambari_commons import shell from unittest import TestCase import threading import tempfile import time from threading import Thread from PythonExecutor import PythonExecutor from CustomServiceOrchestrator import CustomServiceOrchestrator from AmbariConfig import AmbariConfig from mock.mock import MagicMock, patch import StringIO import sys from AgentException import AgentException from FileCache import FileCache from LiveStatus import LiveStatus from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle from ambari_agent.ActionQueue import ActionQueue from only_for_platform import get_platform, PLATFORM_WINDOWS class TestCustomServiceOrchestrator(TestCase): def setUp(self): # disable stdout out = StringIO.StringIO() sys.stdout = out # generate sample config tmpdir = tempfile.gettempdir() exec_tmp_dir = os.path.join(tmpdir, 'tmp') self.config = ConfigParser.RawConfigParser() self.config.add_section('agent') self.config.set('agent', 'prefix', tmpdir) self.config.set('agent', 'tmp_dir', exec_tmp_dir) self.config.set('agent', 'cache_dir', "/cachedir") self.config.add_section('python') self.config.set('python', 'custom_actions_dir', tmpdir) @patch.object(FileCache, "__init__") def test_add_reg_listener_to_controller(self, FileCache_mock): FileCache_mock.return_value = None dummy_controller = MagicMock() config = AmbariConfig().getConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) CustomServiceOrchestrator(config, dummy_controller) self.assertTrue(dummy_controller.registration_listeners.append.called) @patch.object(CustomServiceOrchestrator, 'decompressClusterHostInfo') @patch("hostname.public_hostname") @patch("os.path.isfile") @patch("os.unlink") @patch.object(FileCache, "__init__") def test_dump_command_to_json(self, FileCache_mock, unlink_mock, isfile_mock, hostname_mock, decompress_cluster_host_info_mock): FileCache_mock.return_value = None hostname_mock.return_value = "test.hst" command = { 'commandType': 'EXECUTION_COMMAND', 'role': u'DATANODE', 'roleCommand': u'INSTALL', 'commandId': '1-1', 'taskId': 3, 'clusterName': u'cc', 'serviceName': u'HDFS', 'configurations':{'global' : {}}, 'configurationTags':{'global' : { 'tag': 'v1' }}, 'clusterHostInfo':{'namenode_host' : ['1'], 'slave_hosts' : ['0', '1'], 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_ping_ports': ['8670:0,1']}, 'hostLevelParams':{} } decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'], 'slave_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], 'all_ping_ports': ['8670', '8670']} config = AmbariConfig().getConfig() tempdir = tempfile.gettempdir() config.set('agent', 'prefix', tempdir) dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(config, dummy_controller) isfile_mock.return_value = True # Test dumping EXECUTION_COMMAND json_file = orchestrator.dump_command_to_json(command) self.assertTrue(os.path.exists(json_file)) self.assertTrue(os.path.getsize(json_file) > 0) if get_platform() != PLATFORM_WINDOWS: self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') self.assertTrue(json_file.endswith("command-3.json")) self.assertTrue(decompress_cluster_host_info_mock.called) os.unlink(json_file) # Test dumping STATUS_COMMAND command['commandType']='STATUS_COMMAND' decompress_cluster_host_info_mock.reset_mock() json_file = orchestrator.dump_command_to_json(command) self.assertTrue(os.path.exists(json_file)) self.assertTrue(os.path.getsize(json_file) > 0) if get_platform() != PLATFORM_WINDOWS: self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') self.assertTrue(json_file.endswith("status_command.json")) self.assertFalse(decompress_cluster_host_info_mock.called) os.unlink(json_file) # Testing side effect of dump_command_to_json self.assertEquals(command['public_hostname'], "test.hst") self.assertTrue(unlink_mock.called) @patch("os.path.exists") @patch.object(FileCache, "__init__") def test_resolve_script_path(self, FileCache_mock, exists_mock): FileCache_mock.return_value = None dummy_controller = MagicMock() config = AmbariConfig().getConfig() orchestrator = CustomServiceOrchestrator(config, dummy_controller) # Testing existing path exists_mock.return_value = True path = orchestrator.\ resolve_script_path(os.path.join("HBASE", "package"), os.path.join("scripts", "hbase_master.py")) self.assertEqual(os.path.join("HBASE", "package", "scripts", "hbase_master.py"), path) # Testing not existing path exists_mock.return_value = False try: orchestrator.resolve_script_path("/HBASE", os.path.join("scripts", "hbase_master.py")) self.fail('ExpectedException not thrown') except AgentException: pass # Expected @patch.object(CustomServiceOrchestrator, "resolve_script_path") @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") @patch.object(FileCache, "get_host_scripts_base_dir") @patch.object(FileCache, "get_service_base_dir") @patch.object(FileCache, "get_hook_base_dir") @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(PythonExecutor, "run_file") @patch.object(FileCache, "__init__") def test_runCommand(self, FileCache_mock, run_file_mock, dump_command_to_json_mock, get_hook_base_dir_mock, get_service_base_dir_mock, get_host_scripts_base_dir_mock, resolve_hook_script_path_mock, resolve_script_path_mock): FileCache_mock.return_value = None command = { 'role' : 'REGION_SERVER', 'hostLevelParams' : { 'stack_name' : 'HDP', 'stack_version' : '2.0.7', 'jdk_location' : 'some_location' }, 'commandParams': { 'script_type': 'PYTHON', 'script': 'scripts/hbase_regionserver.py', 'command_timeout': '600', 'service_package_folder' : 'HBASE' }, 'taskId' : '3', 'roleCommand': 'INSTALL' } get_host_scripts_base_dir_mock.return_value = "/host_scripts" get_service_base_dir_mock.return_value = "/basedir/" resolve_script_path_mock.return_value = "/basedir/scriptpath" resolve_hook_script_path_mock.return_value = \ ('/hooks_dir/prefix-command/scripts/hook.py', '/hooks_dir/prefix-command') dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) unix_process_id = 111 orchestrator.commands_in_progress = {command['taskId']: unix_process_id} get_hook_base_dir_mock.return_value = "/hooks/" # normal run case run_file_mock.return_value = { 'stdout' : 'sss', 'stderr' : 'eee', 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 0) self.assertTrue(run_file_mock.called) self.assertEqual(run_file_mock.call_count, 3) run_file_mock.reset_mock() # Case when we force another command run_file_mock.return_value = { 'stdout' : 'sss', 'stderr' : 'eee', 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt", forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS) ## Check that override_output_files was true only during first call self.assertEquals(run_file_mock.call_args_list[0][0][10], True) self.assertEquals(run_file_mock.call_args_list[1][0][10], False) self.assertEquals(run_file_mock.call_args_list[2][0][10], False) ## Check that forced_command_name was taken into account self.assertEqual(run_file_mock.call_args_list[0][0][1][0], CustomServiceOrchestrator.COMMAND_NAME_STATUS) run_file_mock.reset_mock() # unknown script type case command['commandParams']['script_type'] = "SOME_TYPE" ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 1) self.assertFalse(run_file_mock.called) self.assertTrue("Unknown script type" in ret['stdout']) #By default returns empty dictionary self.assertEqual(ret['structuredOut'], '{}') pass @patch("ambari_commons.shell.kill_process_with_children") @patch.object(CustomServiceOrchestrator, "resolve_script_path") @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") @patch.object(FileCache, "get_host_scripts_base_dir") @patch.object(FileCache, "get_service_base_dir") @patch.object(FileCache, "get_hook_base_dir") @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(PythonExecutor, "run_file") @patch.object(FileCache, "__init__") def test_cancel_command(self, FileCache_mock, run_file_mock, dump_command_to_json_mock, get_hook_base_dir_mock, get_service_base_dir_mock, get_host_scripts_base_dir_mock, resolve_hook_script_path_mock, resolve_script_path_mock, kill_process_with_children_mock): FileCache_mock.return_value = None command = { 'role' : 'REGION_SERVER', 'hostLevelParams' : { 'stack_name' : 'HDP', 'stack_version' : '2.0.7', 'jdk_location' : 'some_location' }, 'commandParams': { 'script_type': 'PYTHON', 'script': 'scripts/hbase_regionserver.py', 'command_timeout': '600', 'service_package_folder' : 'HBASE' }, 'taskId' : '3', 'roleCommand': 'INSTALL' } get_host_scripts_base_dir_mock.return_value = "/host_scripts" get_service_base_dir_mock.return_value = "/basedir/" resolve_script_path_mock.return_value = "/basedir/scriptpath" resolve_hook_script_path_mock.return_value = \ ('/hooks_dir/prefix-command/scripts/hook.py', '/hooks_dir/prefix-command') dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) unix_process_id = 111 orchestrator.commands_in_progress = {command['taskId']: unix_process_id} get_hook_base_dir_mock.return_value = "/hooks/" run_file_mock_return_value = { 'stdout' : 'killed', 'stderr' : 'killed', 'exitcode': 1, } def side_effect(*args, **kwargs): time.sleep(0.2) return run_file_mock_return_value run_file_mock.side_effect = side_effect _, out = tempfile.mkstemp() _, err = tempfile.mkstemp() pool = ThreadPool(processes=1) async_result = pool.apply_async(orchestrator.runCommand, (command, out, err)) time.sleep(0.1) orchestrator.cancel_command(command['taskId'], 'reason') ret = async_result.get() self.assertEqual(ret['exitcode'], 1) self.assertEquals(ret['stdout'], 'killed\nCommand aborted. reason') self.assertEquals(ret['stderr'], 'killed\nCommand aborted. reason') self.assertTrue(kill_process_with_children_mock.called) self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys()) self.assertTrue(os.path.exists(out)) self.assertTrue(os.path.exists(err)) try: os.remove(out) os.remove(err) except: pass from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler @patch("ambari_commons.shell.kill_process_with_children") @patch.object(FileCache, "__init__") @patch.object(CustomServiceOrchestrator, "resolve_script_path") @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") @patch.object(StackVersionsFileHandler, "read_stack_version") def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock): FileCache_mock.return_value = None FileCache_mock.cache_dir = MagicMock() resolve_hook_script_path_mock.return_value = None # shell.kill_process_with_children = MagicMock() dummy_controller = MagicMock() cfg = AmbariConfig().getConfig() cfg.set('agent', 'tolerate_download_failures', 'true') cfg.set('agent', 'prefix', '.') cfg.set('agent', 'cache_dir', 'background_tasks') actionQueue = ActionQueue(cfg, dummy_controller) dummy_controller.actionQueue = actionQueue orchestrator = CustomServiceOrchestrator(cfg, dummy_controller) orchestrator.file_cache = MagicMock() def f (a, b): return "" orchestrator.file_cache.get_service_base_dir = f actionQueue.customServiceOrchestrator = orchestrator import TestActionQueue import copy TestActionQueue.patch_output_file(orchestrator.python_executor) orchestrator.python_executor.prepare_process_result = MagicMock() orchestrator.dump_command_to_json = MagicMock() lock = threading.RLock() complete_done = threading.Condition(lock) complete_was_called = {} def command_complete_w(process_condenced_result, handle): with lock: complete_was_called['visited']= '' complete_done.wait(3) actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None) execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command) actionQueue.put([execute_command]) actionQueue.processBackgroundQueueSafeEmpty() time.sleep(.1) orchestrator.cancel_command(19,'') self.assertTrue(kill_process_with_children_mock.called) kill_process_with_children_mock.assert_called_with(33) with lock: complete_done.notifyAll() with lock: self.assertTrue(complete_was_called.has_key('visited')) time.sleep(.1) runningCommand = actionQueue.commandStatuses.get_command_status(19) self.assertTrue(runningCommand is not None) self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS) @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(PythonExecutor, "run_file") @patch.object(FileCache, "__init__") @patch.object(FileCache, "get_custom_actions_base_dir") def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock, FileCache_mock, run_file_mock, dump_command_to_json_mock): FileCache_mock.return_value = None get_custom_actions_base_dir_mock.return_value = "some path" _, script = tempfile.mkstemp() command = { 'role' : 'any', 'commandParams': { 'script_type': 'PYTHON', 'script': 'some_custom_action.py', 'command_timeout': '600', 'jdk_location' : 'some_location' }, 'taskId' : '3', 'roleCommand': 'ACTIONEXECUTE' } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) unix_process_id = 111 orchestrator.commands_in_progress = {command['taskId']: unix_process_id} # normal run case run_file_mock.return_value = { 'stdout' : 'sss', 'stderr' : 'eee', 'exitcode': 0, } ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 0) self.assertTrue(run_file_mock.called) # Hoooks are not supported for custom actions, # that's why run_file() should be called only once self.assertEqual(run_file_mock.call_count, 1) @patch("os.path.isfile") @patch.object(FileCache, "__init__") def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock): FileCache_mock.return_value = None dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) # Testing None param res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command", "script_type") self.assertEqual(res1, None) # Testing existing hook script isfile_mock.return_value = True res2 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command", "script_type") self.assertEqual(res2, (os.path.join('hooks_dir', 'prefix-command', 'scripts', 'hook.py'), os.path.join('hooks_dir', 'prefix-command'))) # Testing not existing hook script isfile_mock.return_value = False res3 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command", "script_type") self.assertEqual(res3, None) @patch.object(CustomServiceOrchestrator, "runCommand") @patch.object(FileCache, "__init__") def test_requestComponentStatus(self, FileCache_mock, runCommand_mock): FileCache_mock.return_value = None status_command = { "serviceName" : 'HDFS', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "DATANODE", 'configurations':{} } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) # Test alive case runCommand_mock.return_value = { "exitcode" : 0 } status = orchestrator.requestComponentStatus(status_command) self.assertEqual(runCommand_mock.return_value, status) # Test dead case runCommand_mock.return_value = { "exitcode" : 1 } status = orchestrator.requestComponentStatus(status_command) self.assertEqual(runCommand_mock.return_value, status) @patch.object(CustomServiceOrchestrator, "runCommand") @patch.object(FileCache, "__init__") def test_requestComponentSecurityState(self, FileCache_mock, runCommand_mock): FileCache_mock.return_value = None status_command = { "serviceName" : 'HDFS', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "DATANODE", 'configurations':{} } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) # Test securityState runCommand_mock.return_value = { 'exitcode' : 0, 'structuredOut' : {'securityState': 'UNSECURED'} } status = orchestrator.requestComponentSecurityState(status_command) self.assertEqual('UNSECURED', status) # Test case where exit code indicates failure runCommand_mock.return_value = { "exitcode" : 1 } status = orchestrator.requestComponentSecurityState(status_command) self.assertEqual('UNKNOWN', status) @patch.object(FileCache, "__init__") def test_requestComponentSecurityState_realFailure(self, FileCache_mock): ''' Tests the case where the CustomServiceOrchestrator attempts to call a service's security_status method, but fails to do so because the script or method was not found. :param FileCache_mock: :return: ''' FileCache_mock.return_value = None status_command = { "serviceName" : 'BOGUS_SERVICE', "commandType" : "STATUS_COMMAND", "clusterName" : "", "componentName" : "DATANODE", 'configurations':{} } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) status = orchestrator.requestComponentSecurityState(status_command) self.assertEqual('UNKNOWN', status) @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(FileCache, "__init__") @patch.object(FileCache, "get_custom_actions_base_dir") def test_runCommand_background_action(self, get_custom_actions_base_dir_mock, FileCache_mock, dump_command_to_json_mock): FileCache_mock.return_value = None get_custom_actions_base_dir_mock.return_value = "some path" _, script = tempfile.mkstemp() command = { 'role' : 'any', 'commandParams': { 'script_type': 'PYTHON', 'script': 'some_custom_action.py', 'command_timeout': '600', 'jdk_location' : 'some_location' }, 'taskId' : '13', 'roleCommand': 'ACTIONEXECUTE', 'commandType': 'BACKGROUND_EXECUTION_COMMAND', '__handle': BackgroundCommandExecutionHandle({'taskId': '13'}, 13, MagicMock(), MagicMock()) } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) import TestActionQueue TestActionQueue.patch_output_file(orchestrator.python_executor) orchestrator.python_executor.condenseOutput = MagicMock() orchestrator.dump_command_to_json = MagicMock() ret = orchestrator.runCommand(command, "out.txt", "err.txt") self.assertEqual(ret['exitcode'], 777) def tearDown(self): # enable stdout sys.stdout = sys.__stdout__