123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import ConfigParser
- from multiprocessing.pool import ThreadPool
- import os
- import pprint
- from ambari_commons import shell
- from unittest import TestCase
- import threading
- import tempfile
- import time
- from threading import Thread
- from PythonExecutor import PythonExecutor
- from CustomServiceOrchestrator import CustomServiceOrchestrator
- from AmbariConfig import AmbariConfig
- from mock.mock import MagicMock, patch
- import StringIO
- import sys
- from AgentException import AgentException
- from FileCache import FileCache
- from LiveStatus import LiveStatus
- from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
- from ambari_agent.ActionQueue import ActionQueue
- from only_for_platform import get_platform, PLATFORM_WINDOWS
- class TestCustomServiceOrchestrator(TestCase):
- def setUp(self):
- # disable stdout
- out = StringIO.StringIO()
- sys.stdout = out
- # generate sample config
- tmpdir = tempfile.gettempdir()
- exec_tmp_dir = os.path.join(tmpdir, 'tmp')
- self.config = ConfigParser.RawConfigParser()
- self.config.add_section('agent')
- self.config.set('agent', 'prefix', tmpdir)
- self.config.set('agent', 'tmp_dir', exec_tmp_dir)
- self.config.set('agent', 'cache_dir', "/cachedir")
- self.config.add_section('python')
- self.config.set('python', 'custom_actions_dir', tmpdir)
- @patch.object(FileCache, "__init__")
- def test_add_reg_listener_to_controller(self, FileCache_mock):
- FileCache_mock.return_value = None
- dummy_controller = MagicMock()
- config = AmbariConfig().getConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- CustomServiceOrchestrator(config, dummy_controller)
- self.assertTrue(dummy_controller.registration_listeners.append.called)
- @patch.object(CustomServiceOrchestrator, 'decompressClusterHostInfo')
- @patch("hostname.public_hostname")
- @patch("os.path.isfile")
- @patch("os.unlink")
- @patch.object(FileCache, "__init__")
- def test_dump_command_to_json(self, FileCache_mock, unlink_mock,
- isfile_mock, hostname_mock,
- decompress_cluster_host_info_mock):
- FileCache_mock.return_value = None
- hostname_mock.return_value = "test.hst"
- command = {
- 'commandType': 'EXECUTION_COMMAND',
- 'role': u'DATANODE',
- 'roleCommand': u'INSTALL',
- 'commandId': '1-1',
- 'taskId': 3,
- 'clusterName': u'cc',
- 'serviceName': u'HDFS',
- 'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v1' }},
- 'clusterHostInfo':{'namenode_host' : ['1'],
- 'slave_hosts' : ['0', '1'],
- 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
- 'all_ping_ports': ['8670:0,1']},
- 'hostLevelParams':{}
- }
-
- decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'],
- 'slave_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
- 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'],
- 'all_ping_ports': ['8670', '8670']}
-
- config = AmbariConfig().getConfig()
- tempdir = tempfile.gettempdir()
- config.set('agent', 'prefix', tempdir)
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(config, dummy_controller)
- isfile_mock.return_value = True
- # Test dumping EXECUTION_COMMAND
- json_file = orchestrator.dump_command_to_json(command)
- self.assertTrue(os.path.exists(json_file))
- self.assertTrue(os.path.getsize(json_file) > 0)
- if get_platform() != PLATFORM_WINDOWS:
- self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
- self.assertTrue(json_file.endswith("command-3.json"))
- self.assertTrue(decompress_cluster_host_info_mock.called)
- os.unlink(json_file)
- # Test dumping STATUS_COMMAND
- command['commandType']='STATUS_COMMAND'
- decompress_cluster_host_info_mock.reset_mock()
- json_file = orchestrator.dump_command_to_json(command)
- self.assertTrue(os.path.exists(json_file))
- self.assertTrue(os.path.getsize(json_file) > 0)
- if get_platform() != PLATFORM_WINDOWS:
- self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
- self.assertTrue(json_file.endswith("status_command.json"))
- self.assertFalse(decompress_cluster_host_info_mock.called)
- os.unlink(json_file)
- # Testing side effect of dump_command_to_json
- self.assertEquals(command['public_hostname'], "test.hst")
- self.assertTrue(unlink_mock.called)
- @patch("os.path.exists")
- @patch.object(FileCache, "__init__")
- def test_resolve_script_path(self, FileCache_mock, exists_mock):
- FileCache_mock.return_value = None
- dummy_controller = MagicMock()
- config = AmbariConfig().getConfig()
- orchestrator = CustomServiceOrchestrator(config, dummy_controller)
- # Testing existing path
- exists_mock.return_value = True
- path = orchestrator.\
- resolve_script_path(os.path.join("HBASE", "package"), os.path.join("scripts", "hbase_master.py"))
- self.assertEqual(os.path.join("HBASE", "package", "scripts", "hbase_master.py"), path)
- # Testing not existing path
- exists_mock.return_value = False
- try:
- orchestrator.resolve_script_path("/HBASE",
- os.path.join("scripts", "hbase_master.py"))
- self.fail('ExpectedException not thrown')
- except AgentException:
- pass # Expected
- @patch.object(CustomServiceOrchestrator, "resolve_script_path")
- @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
- @patch.object(FileCache, "get_host_scripts_base_dir")
- @patch.object(FileCache, "get_service_base_dir")
- @patch.object(FileCache, "get_hook_base_dir")
- @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
- @patch.object(PythonExecutor, "run_file")
- @patch.object(FileCache, "__init__")
- def test_runCommand(self, FileCache_mock,
- run_file_mock, dump_command_to_json_mock,
- get_hook_base_dir_mock, get_service_base_dir_mock,
- get_host_scripts_base_dir_mock,
- resolve_hook_script_path_mock,
- resolve_script_path_mock):
-
- FileCache_mock.return_value = None
- command = {
- 'role' : 'REGION_SERVER',
- 'hostLevelParams' : {
- 'stack_name' : 'HDP',
- 'stack_version' : '2.0.7',
- 'jdk_location' : 'some_location'
- },
- 'commandParams': {
- 'script_type': 'PYTHON',
- 'script': 'scripts/hbase_regionserver.py',
- 'command_timeout': '600',
- 'service_package_folder' : 'HBASE'
- },
- 'taskId' : '3',
- 'roleCommand': 'INSTALL'
- }
-
- get_host_scripts_base_dir_mock.return_value = "/host_scripts"
- get_service_base_dir_mock.return_value = "/basedir/"
- resolve_script_path_mock.return_value = "/basedir/scriptpath"
- resolve_hook_script_path_mock.return_value = \
- ('/hooks_dir/prefix-command/scripts/hook.py',
- '/hooks_dir/prefix-command')
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- unix_process_id = 111
- orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
- get_hook_base_dir_mock.return_value = "/hooks/"
- # normal run case
- run_file_mock.return_value = {
- 'stdout' : 'sss',
- 'stderr' : 'eee',
- 'exitcode': 0,
- }
- ret = orchestrator.runCommand(command, "out.txt", "err.txt")
- self.assertEqual(ret['exitcode'], 0)
- self.assertTrue(run_file_mock.called)
- self.assertEqual(run_file_mock.call_count, 3)
- run_file_mock.reset_mock()
- # Case when we force another command
- run_file_mock.return_value = {
- 'stdout' : 'sss',
- 'stderr' : 'eee',
- 'exitcode': 0,
- }
- ret = orchestrator.runCommand(command, "out.txt", "err.txt",
- forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
- ## Check that override_output_files was true only during first call
- self.assertEquals(run_file_mock.call_args_list[0][0][10], True)
- self.assertEquals(run_file_mock.call_args_list[1][0][10], False)
- self.assertEquals(run_file_mock.call_args_list[2][0][10], False)
- ## Check that forced_command_name was taken into account
- self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
- CustomServiceOrchestrator.COMMAND_NAME_STATUS)
- run_file_mock.reset_mock()
- # unknown script type case
- command['commandParams']['script_type'] = "SOME_TYPE"
- ret = orchestrator.runCommand(command, "out.txt", "err.txt")
- self.assertEqual(ret['exitcode'], 1)
- self.assertFalse(run_file_mock.called)
- self.assertTrue("Unknown script type" in ret['stdout'])
- #By default returns empty dictionary
- self.assertEqual(ret['structuredOut'], '{}')
- pass
- @patch("ambari_commons.shell.kill_process_with_children")
- @patch.object(CustomServiceOrchestrator, "resolve_script_path")
- @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
- @patch.object(FileCache, "get_host_scripts_base_dir")
- @patch.object(FileCache, "get_service_base_dir")
- @patch.object(FileCache, "get_hook_base_dir")
- @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
- @patch.object(PythonExecutor, "run_file")
- @patch.object(FileCache, "__init__")
- def test_cancel_command(self, FileCache_mock,
- run_file_mock, dump_command_to_json_mock,
- get_hook_base_dir_mock, get_service_base_dir_mock,
- get_host_scripts_base_dir_mock,
- resolve_hook_script_path_mock, resolve_script_path_mock,
- kill_process_with_children_mock):
- FileCache_mock.return_value = None
- command = {
- 'role' : 'REGION_SERVER',
- 'hostLevelParams' : {
- 'stack_name' : 'HDP',
- 'stack_version' : '2.0.7',
- 'jdk_location' : 'some_location'
- },
- 'commandParams': {
- 'script_type': 'PYTHON',
- 'script': 'scripts/hbase_regionserver.py',
- 'command_timeout': '600',
- 'service_package_folder' : 'HBASE'
- },
- 'taskId' : '3',
- 'roleCommand': 'INSTALL'
- }
-
- get_host_scripts_base_dir_mock.return_value = "/host_scripts"
- get_service_base_dir_mock.return_value = "/basedir/"
- resolve_script_path_mock.return_value = "/basedir/scriptpath"
- resolve_hook_script_path_mock.return_value = \
- ('/hooks_dir/prefix-command/scripts/hook.py',
- '/hooks_dir/prefix-command')
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- unix_process_id = 111
- orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
- get_hook_base_dir_mock.return_value = "/hooks/"
- run_file_mock_return_value = {
- 'stdout' : 'killed',
- 'stderr' : 'killed',
- 'exitcode': 1,
- }
- def side_effect(*args, **kwargs):
- time.sleep(0.2)
- return run_file_mock_return_value
- run_file_mock.side_effect = side_effect
- _, out = tempfile.mkstemp()
- _, err = tempfile.mkstemp()
- pool = ThreadPool(processes=1)
- async_result = pool.apply_async(orchestrator.runCommand, (command, out, err))
- time.sleep(0.1)
- orchestrator.cancel_command(command['taskId'], 'reason')
- ret = async_result.get()
- self.assertEqual(ret['exitcode'], 1)
- self.assertEquals(ret['stdout'], 'killed\nCommand aborted. reason')
- self.assertEquals(ret['stderr'], 'killed\nCommand aborted. reason')
- self.assertTrue(kill_process_with_children_mock.called)
- self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys())
- self.assertTrue(os.path.exists(out))
- self.assertTrue(os.path.exists(err))
- try:
- os.remove(out)
- os.remove(err)
- except:
- pass
- from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
- @patch("ambari_commons.shell.kill_process_with_children")
- @patch.object(FileCache, "__init__")
- @patch.object(CustomServiceOrchestrator, "resolve_script_path")
- @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
- @patch.object(StackVersionsFileHandler, "read_stack_version")
- def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,
- kill_process_with_children_mock):
- FileCache_mock.return_value = None
- FileCache_mock.cache_dir = MagicMock()
- resolve_hook_script_path_mock.return_value = None
- # shell.kill_process_with_children = MagicMock()
- dummy_controller = MagicMock()
- cfg = AmbariConfig().getConfig()
- cfg.set('agent', 'tolerate_download_failures', 'true')
- cfg.set('agent', 'prefix', '.')
- cfg.set('agent', 'cache_dir', 'background_tasks')
- actionQueue = ActionQueue(cfg, dummy_controller)
- dummy_controller.actionQueue = actionQueue
- orchestrator = CustomServiceOrchestrator(cfg, dummy_controller)
- orchestrator.file_cache = MagicMock()
- def f (a, b):
- return ""
- orchestrator.file_cache.get_service_base_dir = f
- actionQueue.customServiceOrchestrator = orchestrator
- import TestActionQueue
- import copy
- TestActionQueue.patch_output_file(orchestrator.python_executor)
- orchestrator.python_executor.prepare_process_result = MagicMock()
- orchestrator.dump_command_to_json = MagicMock()
- lock = threading.RLock()
- complete_done = threading.Condition(lock)
- complete_was_called = {}
- def command_complete_w(process_condenced_result, handle):
- with lock:
- complete_was_called['visited']= ''
- complete_done.wait(3)
- actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None)
- execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command)
- actionQueue.put([execute_command])
- actionQueue.processBackgroundQueueSafeEmpty()
- time.sleep(.1)
- orchestrator.cancel_command(19,'')
- self.assertTrue(kill_process_with_children_mock.called)
- kill_process_with_children_mock.assert_called_with(33)
- with lock:
- complete_done.notifyAll()
- with lock:
- self.assertTrue(complete_was_called.has_key('visited'))
- time.sleep(.1)
- runningCommand = actionQueue.commandStatuses.get_command_status(19)
- self.assertTrue(runningCommand is not None)
- self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS)
- @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
- @patch.object(PythonExecutor, "run_file")
- @patch.object(FileCache, "__init__")
- @patch.object(FileCache, "get_custom_actions_base_dir")
- def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock,
- FileCache_mock,
- run_file_mock, dump_command_to_json_mock):
- FileCache_mock.return_value = None
- get_custom_actions_base_dir_mock.return_value = "some path"
- _, script = tempfile.mkstemp()
- command = {
- 'role' : 'any',
- 'commandParams': {
- 'script_type': 'PYTHON',
- 'script': 'some_custom_action.py',
- 'command_timeout': '600',
- 'jdk_location' : 'some_location'
- },
- 'taskId' : '3',
- 'roleCommand': 'ACTIONEXECUTE'
- }
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- unix_process_id = 111
- orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
- # normal run case
- run_file_mock.return_value = {
- 'stdout' : 'sss',
- 'stderr' : 'eee',
- 'exitcode': 0,
- }
- ret = orchestrator.runCommand(command, "out.txt", "err.txt")
- self.assertEqual(ret['exitcode'], 0)
- self.assertTrue(run_file_mock.called)
- # Hoooks are not supported for custom actions,
- # that's why run_file() should be called only once
- self.assertEqual(run_file_mock.call_count, 1)
- @patch("os.path.isfile")
- @patch.object(FileCache, "__init__")
- def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock):
- FileCache_mock.return_value = None
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- # Testing None param
- res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command",
- "script_type")
- self.assertEqual(res1, None)
- # Testing existing hook script
- isfile_mock.return_value = True
- res2 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command",
- "script_type")
- self.assertEqual(res2, (os.path.join('hooks_dir', 'prefix-command', 'scripts', 'hook.py'),
- os.path.join('hooks_dir', 'prefix-command')))
- # Testing not existing hook script
- isfile_mock.return_value = False
- res3 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command",
- "script_type")
- self.assertEqual(res3, None)
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch.object(FileCache, "__init__")
- def test_requestComponentStatus(self, FileCache_mock, runCommand_mock):
- FileCache_mock.return_value = None
- status_command = {
- "serviceName" : 'HDFS',
- "commandType" : "STATUS_COMMAND",
- "clusterName" : "",
- "componentName" : "DATANODE",
- 'configurations':{}
- }
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- # Test alive case
- runCommand_mock.return_value = {
- "exitcode" : 0
- }
- status = orchestrator.requestComponentStatus(status_command)
- self.assertEqual(runCommand_mock.return_value, status)
- # Test dead case
- runCommand_mock.return_value = {
- "exitcode" : 1
- }
- status = orchestrator.requestComponentStatus(status_command)
- self.assertEqual(runCommand_mock.return_value, status)
- @patch.object(CustomServiceOrchestrator, "runCommand")
- @patch.object(FileCache, "__init__")
- def test_requestComponentSecurityState(self, FileCache_mock, runCommand_mock):
- FileCache_mock.return_value = None
- status_command = {
- "serviceName" : 'HDFS',
- "commandType" : "STATUS_COMMAND",
- "clusterName" : "",
- "componentName" : "DATANODE",
- 'configurations':{}
- }
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- # Test securityState
- runCommand_mock.return_value = {
- 'exitcode' : 0,
- 'structuredOut' : {'securityState': 'UNSECURED'}
- }
- status = orchestrator.requestComponentSecurityState(status_command)
- self.assertEqual('UNSECURED', status)
- # Test case where exit code indicates failure
- runCommand_mock.return_value = {
- "exitcode" : 1
- }
- status = orchestrator.requestComponentSecurityState(status_command)
- self.assertEqual('UNKNOWN', status)
- @patch.object(FileCache, "__init__")
- def test_requestComponentSecurityState_realFailure(self, FileCache_mock):
- '''
- Tests the case where the CustomServiceOrchestrator attempts to call a service's security_status
- method, but fails to do so because the script or method was not found.
- :param FileCache_mock:
- :return:
- '''
- FileCache_mock.return_value = None
- status_command = {
- "serviceName" : 'BOGUS_SERVICE',
- "commandType" : "STATUS_COMMAND",
- "clusterName" : "",
- "componentName" : "DATANODE",
- 'configurations':{}
- }
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- status = orchestrator.requestComponentSecurityState(status_command)
- self.assertEqual('UNKNOWN', status)
- @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
- @patch.object(FileCache, "__init__")
- @patch.object(FileCache, "get_custom_actions_base_dir")
- def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
- FileCache_mock,
- dump_command_to_json_mock):
- FileCache_mock.return_value = None
- get_custom_actions_base_dir_mock.return_value = "some path"
- _, script = tempfile.mkstemp()
- command = {
- 'role' : 'any',
- 'commandParams': {
- 'script_type': 'PYTHON',
- 'script': 'some_custom_action.py',
- 'command_timeout': '600',
- 'jdk_location' : 'some_location'
- },
- 'taskId' : '13',
- 'roleCommand': 'ACTIONEXECUTE',
- 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
- '__handle': BackgroundCommandExecutionHandle({'taskId': '13'}, 13,
- MagicMock(), MagicMock())
- }
- dummy_controller = MagicMock()
- orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
- import TestActionQueue
- TestActionQueue.patch_output_file(orchestrator.python_executor)
- orchestrator.python_executor.condenseOutput = MagicMock()
- orchestrator.dump_command_to_json = MagicMock()
- ret = orchestrator.runCommand(command, "out.txt", "err.txt")
- self.assertEqual(ret['exitcode'], 777)
- def tearDown(self):
- # enable stdout
- sys.stdout = sys.__stdout__
|