''' Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' import StringIO from unittest import TestCase import sys from mock.mock import patch from mock.mock import MagicMock from mock.mock import create_autospec import os, errno, tempfile import signal import stat import datetime # We have to use this import HACK because the filename contains a dash ambari_server = __import__('ambari-server') FatalException = ambari_server.FatalException class TestAmbariServer(TestCase): def setUp(self): out = StringIO.StringIO() sys.stdout = out def tearDown(self): sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'configure_database_username_password') @patch.object(ambari_server, 'run_os_command') @patch('optparse.Values') def test_configure_pg_hba_ambaridb_users(self, OptParseValuesMock, run_os_command_method, configure_database_username_password_method): # Prepare mocks run_os_command_method.return_value = (0, "", "") opvm = OptParseValuesMock.return_value opvm.database_username = "ffdf" tf1 = tempfile.NamedTemporaryFile() ambari_server.PG_HBA_CONF_FILE = tf1.name # Run test ambari_server.configure_pg_hba_ambaridb_users() # Check results self.assertTrue(run_os_command_method.called) self.assertTrue(configure_database_username_password_method.called) string_expected = self.get_file_string(self .get_samples_dir("configure_pg_hba_ambaridb_users1")) string_actual = self.get_file_string(ambari_server.PG_HBA_CONF_FILE) self.assertEquals(string_expected, string_actual) pass def test_configure_pg_hba_postgres_user(self): tf1 = tempfile.NamedTemporaryFile() ambari_server.PG_HBA_CONF_FILE = tf1.name with open(ambari_server.PG_HBA_CONF_FILE, 'w') as fout: fout.write("\n") fout.write("local all all md5\n") fout.write("host all all 0.0.0.0/0 md5\n") fout.write("host all all ::/0 md5\n") ambari_server.configure_pg_hba_postgres_user() expected = self.get_file_string(self.get_samples_dir( "configure_pg_hba_ambaridb_users2")) result = self.get_file_string(ambari_server.PG_HBA_CONF_FILE) self.assertEqual(expected, result, "pg_hba_conf not processed") mode = oct(os.stat(ambari_server.PG_HBA_CONF_FILE)[stat.ST_MODE]) str_mode = str(mode)[-4:] self.assertEqual("0644", str_mode, "Wrong file permissions") @patch('__builtin__.raw_input') def get_choice_string_input(self, raw_input_method): prompt = "blablabla" default = "default blablabla" firstChoice = set(['yes','ye', 'y']) secondChoice = set(['no','n']) # test first input raw_input_method.return_value = "Y" result = ambari_server.get_choice_string_input(prompt, default, firstChoice, secondChoice) self.assertEquals(result, True) raw_input_method.reset_mock() # test second input raw_input_method.return_value = "N" result = ambari_server.get_choice_string_input(prompt, default, firstChoice, secondChoice) self.assertEquals(result, False) raw_input_method.reset_mock() # test enter pressed raw_input_method.return_value = "" result = ambari_server.get_choice_string_input(prompt, default, firstChoice, secondChoice) self.assertEquals(result, default) raw_input_method.reset_mock() # test wrong input list_of_return_values= ['yes', 'dsad', 'fdsfds'] def side_effect(list): return list_of_return_values.pop() raw_input_method.side_effect = side_effect result = ambari_server.get_choice_string_input(prompt, default, firstChoice, secondChoice) self.assertEquals(result, True) self.assertEquals(raw_input_method.call_count, 3) pass @patch('re.search') @patch('__builtin__.raw_input') @patch('getpass.getpass') def get_validated_string_input(self, get_pass_method, raw_input_method, re_search_method): prompt = "blabla" default = "default_pass" pattern = "pattern_pp" description = "blabla2" # check password input self.assertFalse(False, ambari_server.SILENT) is_pass = True get_pass_method.return_value = "dfdsfdsfds" result = ambari_server.get_validated_string_input(prompt, default, pattern, description, is_pass) self.assertEquals(get_pass_method.return_value, result) get_pass_method.assure_called_once(prompt) self.assertFalse(raw_input_method.called) # check raw input get_pass_method.reset_mock() raw_input_method.reset_mock() is_pass = False raw_input_method.return_value = "dkf90ewuf0" result = ambari_server.get_validated_string_input(prompt, default, pattern, description, is_pass) self.assertEquals(raw_input_method.return_value, result) self.assertFalse(get_pass_method.called) raw_input_method.assure_called_once(prompt) def test_get_pass_file_path(self): result = ambari_server.get_pass_file_path("/etc/ambari/conf_file") self.assertEquals("/etc/ambari/password.dat", result) pass @patch.object(ambari_server, 'setup') @patch.object(ambari_server, 'start') @patch.object(ambari_server, 'stop') @patch.object(ambari_server, 'reset') @patch('optparse.OptionParser') def test_main_test_setup(self, OptionParserMock, reset_method, stop_method, start_method, setup_method): opm = OptionParserMock.return_value options = MagicMock() args = ["setup"] opm.parse_args.return_value = (options, args) options.database=None ambari_server.main() self.assertTrue(setup_method.called) self.assertFalse(start_method.called) self.assertFalse(stop_method.called) self.assertFalse(reset_method.called) self.assertFalse(False, ambari_server.VERBOSE) self.assertFalse(False, ambari_server.SILENT) @patch.object(ambari_server, 'setup') @patch.object(ambari_server, 'start') @patch.object(ambari_server, 'stop') @patch.object(ambari_server, 'reset') @patch('optparse.OptionParser') def test_main_test_start(self, OptionParserMock, reset_method, stop_method, start_method, setup_method): opm = OptionParserMock.return_value options = MagicMock() args = ["setup"] opm.parse_args.return_value = (options, args) options.database=None ambari_server.main() self.assertTrue(setup_method.called) self.assertFalse(start_method.called) self.assertFalse(stop_method.called) self.assertFalse(reset_method.called) self.assertFalse(False, ambari_server.VERBOSE) self.assertFalse(False, ambari_server.SILENT) @patch.object(ambari_server, 'setup') @patch.object(ambari_server, 'start') @patch.object(ambari_server, 'stop') @patch.object(ambari_server, 'reset') @patch('optparse.OptionParser') def test_main_test_start_debug_short(self, OptionParserMock, reset_method, stop_method, start_method, setup_method): opm = OptionParserMock.return_value options = MagicMock() args = ["start", "-g"] opm.parse_args.return_value = (options, args) options.database=None ambari_server.main() self.assertFalse(setup_method.called) self.assertTrue(start_method.called) self.assertFalse(stop_method.called) self.assertFalse(reset_method.called) self.assertTrue(ambari_server.SERVER_DEBUG_MODE) @patch.object(ambari_server, 'setup') @patch.object(ambari_server, 'start') @patch.object(ambari_server, 'stop') @patch.object(ambari_server, 'reset') @patch('optparse.OptionParser') def test_main_test_start_debug_long(self, OptionParserMock, reset_method, stop_method, start_method, setup_method): opm = OptionParserMock.return_value options = MagicMock() args = ["start", "--debug"] opm.parse_args.return_value = (options, args) options.database=None ambari_server.main() self.assertFalse(setup_method.called) self.assertTrue(start_method.called) self.assertFalse(stop_method.called) self.assertFalse(reset_method.called) self.assertTrue(ambari_server.SERVER_DEBUG_MODE) @patch.object(ambari_server, 'setup') @patch.object(ambari_server, 'start') @patch.object(ambari_server, 'stop') @patch.object(ambari_server, 'reset') @patch('optparse.OptionParser') def test_main_test_stop(self, OptionParserMock, reset_method, stop_method, start_method, setup_method): opm = OptionParserMock.return_value options = MagicMock() args = ["stop"] opm.parse_args.return_value = (options, args) options.database = None ambari_server.main() self.assertFalse(setup_method.called) self.assertFalse(start_method.called) self.assertTrue(stop_method.called) self.assertFalse(reset_method.called) self.assertFalse(False, ambari_server.VERBOSE) self.assertFalse(False, ambari_server.SILENT) @patch.object(ambari_server, 'setup') @patch.object(ambari_server, 'start') @patch.object(ambari_server, 'stop') @patch.object(ambari_server, 'reset') @patch('optparse.OptionParser') def test_main_test_reset(self, OptionParserMock, reset_method, stop_method, start_method, setup_method): opm = OptionParserMock.return_value options = MagicMock() args = ["reset"] opm.parse_args.return_value = (options, args) options.database=None ambari_server.main() self.assertFalse(setup_method.called) self.assertFalse(start_method.called) self.assertFalse(stop_method.called) self.assertTrue(reset_method.called) self.assertFalse(False, ambari_server.VERBOSE) self.assertFalse(False, ambari_server.SILENT) def test_configure_postgresql_conf(self): tf1 = tempfile.NamedTemporaryFile() ambari_server.POSTGRESQL_CONF_FILE = tf1.name with open(ambari_server.POSTGRESQL_CONF_FILE, 'w') as f: f.write("#listen_addresses = '127.0.0.1' #\n") f.write("#listen_addresses = '127.0.0.1'") ambari_server.configure_postgresql_conf() expected = self.get_file_string(self.get_samples_dir( "configure_postgresql_conf1")) result = self.get_file_string(ambari_server.POSTGRESQL_CONF_FILE) self.assertEqual(expected, result, "postgresql.conf not updated") mode = oct(os.stat(ambari_server.POSTGRESQL_CONF_FILE)[stat.ST_MODE]) str_mode = str(mode)[-4:] self.assertEqual("0644", str_mode, "Wrong file permissions") @patch.object(ambari_server, "restart_postgres") @patch.object(ambari_server, "get_postgre_status") @patch.object(ambari_server, "configure_postgresql_conf") @patch.object(ambari_server, "configure_pg_hba_ambaridb_users") @patch.object(ambari_server, "configure_pg_hba_postgres_user") def test_configure_postgres(self, configure_pg_hba_postgres_user_mock, configure_pg_hba_ambaridb_users_mock, configure_postgresql_conf_mock, get_postgre_status_mock, restart_postgres_mock): tf1 = tempfile.NamedTemporaryFile() tf2 = tempfile.NamedTemporaryFile() ambari_server.PG_HBA_CONF_FILE = tf1.name ambari_server.PG_HBA_CONF_FILE_BACKUP = tf2.name args = MagicMock() out = StringIO.StringIO() sys.stdout = out rcode = ambari_server.configure_postgres() sys.stdout = sys.__stdout__ self.assertEqual(0, rcode) self.assertEqual("Backup for pg_hba found, reconfiguration not required\n", out.getvalue()) ambari_server.PG_HBA_CONF_FILE_BACKUP = tempfile.mktemp() get_postgre_status_mock.return_value = ambari_server.PG_STATUS_RUNNING restart_postgres_mock.return_value = 0 rcode = ambari_server.configure_postgres() self.assertTrue(os.path.isfile(ambari_server.PG_HBA_CONF_FILE_BACKUP), "postgresql.conf backup not created") self.assertTrue(configure_pg_hba_postgres_user_mock.called) self.assertTrue(configure_pg_hba_ambaridb_users_mock.called) mode = oct(os.stat(ambari_server.PG_HBA_CONF_FILE)[stat.ST_MODE]) str_mode = str(mode)[-4:] self.assertEqual("0644", str_mode, "Wrong file permissions") self.assertTrue(configure_postgresql_conf_mock.called) self.assertEqual(0, rcode) os.unlink(ambari_server.PG_HBA_CONF_FILE_BACKUP) get_postgre_status_mock.return_value = "stopped" rcode = ambari_server.configure_postgres() self.assertEqual(0, rcode) os.unlink(ambari_server.PG_HBA_CONF_FILE_BACKUP) sys.stdout = sys.__stdout__ @patch("time.sleep") @patch("subprocess.Popen") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "get_postgre_status") @patch.object(ambari_server, "print_info_msg") def test_restart_postgres(self, printInfoMsg_mock, get_postgre_status_mock, run_os_command_mock, popenMock, sleepMock): p = MagicMock() p.poll.return_value = 0 popenMock.return_value = p rcode = ambari_server.restart_postgres() self.assertEqual(0, rcode) p.poll.return_value = None get_postgre_status_mock.return_value = "stopped" run_os_command_mock.return_value = (1, None, None) rcode = ambari_server.restart_postgres() self.assertEqual(1, rcode) @patch("shlex.split") @patch("subprocess.Popen") @patch.object(ambari_server, "print_info_msg") def test_run_os_command(self, printInfoMsg_mock, popenMock, splitMock): p = MagicMock() p.communicate.return_value = (None, None) p.returncode = 3 popenMock.return_value = p # with list arg cmd = ["exec", "arg"] ambari_server.run_os_command(cmd) self.assertFalse(splitMock.called) # with str arg resp = ambari_server.run_os_command("runme") self.assertEqual(3, resp[0]) self.assertTrue(splitMock.called) @patch.object(ambari_server, "get_conf_dir") @patch.object(ambari_server, "search_file") def test_write_property(self, search_file_mock, get_conf_dir_mock): expected_content = "key1=val1\n" tf1 = tempfile.NamedTemporaryFile() search_file_mock.return_value = tf1.name ambari_server.write_property("key1", "val1") result = tf1.read() self.assertTrue(expected_content in result) @patch.object(ambari_server, "configure_database_username_password") @patch.object(ambari_server, "run_os_command") def test_setup_db(self, run_os_command_mock, configure_database_username_password_mock): run_os_command_mock.return_value = (0, None, None) result = ambari_server.setup_db(MagicMock()) self.assertTrue(configure_database_username_password_mock.called) self.assertEqual(0, result) @patch.object(ambari_server, "get_YN_input") @patch.object(ambari_server, "run_os_command") def test_check_selinux(self, run_os_command_mock, getYNInput_mock): run_os_command_mock.return_value = (0, ambari_server.SE_STATUS_DISABLED, None) rcode = ambari_server.check_selinux() self.assertEqual(0, rcode) getYNInput_mock.return_value = True run_os_command_mock.return_value = (0,"enabled " + ambari_server.SE_MODE_ENFORCING, None) rcode = ambari_server.check_selinux() self.assertEqual(0, rcode) self.assertTrue(run_os_command_mock.called) self.assertTrue(getYNInput_mock.called) @patch.object(ambari_server, "print_info_msg") def test_get_ambari_jars(self, printInfoMsg_mock): env = "/ambari/jars" os.environ[ambari_server.AMBARI_SERVER_LIB] = env result = ambari_server.get_ambari_jars() self.assertEqual(env, result) del os.environ[ambari_server.AMBARI_SERVER_LIB] result = ambari_server.get_ambari_jars() self.assertEqual("/usr/lib/ambari-server", result) self.assertTrue(printInfoMsg_mock.called) @patch("glob.glob") @patch.object(ambari_server, "print_info_msg") def test_get_share_jars(self, printInfoMsg_mock, globMock): globMock.return_value = ["one", "two"] expected = "one:two:one:two" result = ambari_server.get_share_jars() self.assertEqual(expected, result) globMock.return_value = [] expected = "" result = ambari_server.get_share_jars() self.assertEqual(expected, result) @patch("glob.glob") @patch.object(ambari_server, "print_info_msg") def test_get_ambari_classpath(self, printInfoMsg_mock, globMock): globMock.return_value = ["one"] result = ambari_server.get_ambari_classpath() self.assertTrue(ambari_server.get_ambari_jars() in result) self.assertTrue(ambari_server.get_share_jars() in result) globMock.return_value = [] result = ambari_server.get_ambari_classpath() self.assertTrue(ambari_server.get_ambari_jars() in result) self.assertFalse(":" in result) @patch.object(ambari_server, "print_info_msg") def test_get_conf_dir(self, printInfoMsg_mock): env = "/dummy/ambari/conf" os.environ[ambari_server.AMBARI_CONF_VAR] = env result = ambari_server.get_conf_dir() self.assertEqual(env, result) del os.environ[ambari_server.AMBARI_CONF_VAR] result = ambari_server.get_conf_dir() self.assertEqual("/etc/ambari-server/conf", result) def test_search_file(self): path = os.path.dirname(__file__) result = ambari_server.search_file(__file__, path) expected = os.path.abspath(__file__) self.assertEqual(expected, result) result = ambari_server.search_file("non_existent_file", path) self.assertEqual(None, result) @patch.object(ambari_server, "search_file") def test_find_properties_file(self, search_file_mock): # Testing case when file is not found search_file_mock.return_value = None try: ambari_server.find_properties_file() self.fail("File not found'") except FatalException: # Expected pass self.assertTrue(search_file_mock.called) # Testing case when file is found value = MagicMock() search_file_mock.return_value = value result = ambari_server.find_properties_file() self.assertTrue(result is value) @patch.object(ambari_server, "find_properties_file") @patch("__builtin__.open") @patch("ambari-server.Properties") def test_read_ambari_user(self, properties_mock, open_mock, find_properties_file_mock): open_mock.return_value = "dummy" find_properties_file_mock.return_value = "dummy" # Testing with defined user properties_mock.return_value.__getitem__.return_value = "dummy_user" user = ambari_server.read_ambari_user() self.assertEquals(user, "dummy_user") # Testing with undefined user properties_mock.return_value.__getitem__.return_value = None user = ambari_server.read_ambari_user() self.assertEquals(user, None) @patch.object(ambari_server, "set_file_permissions") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "get_ambari_properties") @patch.object(ambari_server, "get_value_from_properties") @patch.object(ambari_server, "os.mkdir") def test_adjust_directory_permissions(self, mkdir_mock, get_value_from_properties_mock, get_ambari_properties_mock, run_os_command_mock, set_file_permissions_mock): # Testing boostrap dir wipe properties_mock = MagicMock() get_value_from_properties_mock.return_value = "dummy_bootstrap_dir" ambari_server.adjust_directory_permissions("user") self.assertEquals(run_os_command_mock.call_args_list[0][0][0], "rm -rf dummy_bootstrap_dir/*") self.assertTrue(mkdir_mock.called) set_file_permissions_mock.reset_mock() # Test recursive calls old_list = ambari_server.NR_ADJUST_OWNERSHIP_LIST ambari_server.NR_ADJUST_OWNERSHIP_LIST = [ ( "/etc/ambari-server/conf", "755", "{0}", "{0}", True ), ( "/etc/ambari-server/conf/ambari.properties", "644", "{0}", "{0}", False ) ] ambari_server.adjust_directory_permissions("user") self.assertTrue(len(set_file_permissions_mock.call_args_list) == len(ambari_server.NR_ADJUST_OWNERSHIP_LIST)) self.assertEquals(set_file_permissions_mock.call_args_list[0][0][4], True) self.assertEquals(set_file_permissions_mock.call_args_list[1][0][4], False) ambari_server.NR_ADJUST_OWNERSHIP_LIST = old_list @patch("os.path.exists") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "print_warning_msg") @patch.object(ambari_server, "print_info_msg") def test_set_file_permissions(self, print_info_msg_mock, print_warning_msg_mock, run_os_command_mock, exists_mock): # Testing not existent file scenario exists_mock.return_value = False ambari_server.set_file_permissions("dummy-file", "dummy-mod", "dummy-user", "dummy-group", False) self.assertFalse(run_os_command_mock.called) self.assertTrue(print_info_msg_mock.called) run_os_command_mock.reset_mock() print_warning_msg_mock.reset_mock() # Testing OK scenario exists_mock.return_value = True run_os_command_mock.side_effect = [(0, "", ""), (0, "", "")] ambari_server.set_file_permissions("dummy-file", "dummy-mod", "dummy-user", "dummy-group", False) self.assertTrue(len(run_os_command_mock.call_args_list) == 2) self.assertFalse(print_warning_msg_mock.called) run_os_command_mock.reset_mock() print_warning_msg_mock.reset_mock() # Testing first command fail run_os_command_mock.side_effect = [(1, "", ""), (0, "", "")] ambari_server.set_file_permissions("dummy-file", "dummy-mod", "dummy-user", "dummy-group", False) self.assertTrue(len(run_os_command_mock.call_args_list) == 2) self.assertTrue(print_warning_msg_mock.called) run_os_command_mock.reset_mock() print_warning_msg_mock.reset_mock() # Testing second command fail run_os_command_mock.side_effect = [(0, "", ""), (1, "", "")] ambari_server.set_file_permissions("dummy-file", "dummy-mod", "dummy-user", "dummy-group", False) self.assertTrue(len(run_os_command_mock.call_args_list) == 2) self.assertTrue(print_warning_msg_mock.called) run_os_command_mock.reset_mock() print_warning_msg_mock.reset_mock() # Testing recursive operation exists_mock.return_value = True run_os_command_mock.side_effect = [(0, "", ""), (0, "", "")] ambari_server.set_file_permissions("dummy-file", "dummy-mod", "dummy-user", "dummy-group", True) self.assertTrue(len(run_os_command_mock.call_args_list) == 2) self.assertTrue("-R" in run_os_command_mock.call_args_list[0][0][0]) self.assertTrue("-R" in run_os_command_mock.call_args_list[1][0][0]) self.assertFalse(print_warning_msg_mock.called) run_os_command_mock.reset_mock() print_warning_msg_mock.reset_mock() # Testing non-recursive operation exists_mock.return_value = True run_os_command_mock.side_effect = [(0, "", ""), (0, "", "")] ambari_server.set_file_permissions("dummy-file", "dummy-mod", "dummy-user", "dummy-group", False) self.assertTrue(len(run_os_command_mock.call_args_list) == 2) self.assertFalse("-R" in run_os_command_mock.call_args_list[0][0][0]) self.assertFalse("-R" in run_os_command_mock.call_args_list[1][0][0]) self.assertFalse(print_warning_msg_mock.called) run_os_command_mock.reset_mock() print_warning_msg_mock.reset_mock() @patch.object(ambari_server, "get_validated_string_input") @patch.object(ambari_server, "print_info_msg") @patch.object(ambari_server, "print_warning_msg") @patch.object(ambari_server, "run_os_command") def test_create_custom_user(self, run_os_command_mock, print_warning_msg_mock, print_info_msg_mock, get_validated_string_input_mock): user = "dummy-user" get_validated_string_input_mock.return_value = user # Testing scenario: existing group, existing user run_os_command_mock.side_effect = [(9, "", ""), (9, "", ""), (0, "", "")] result = ambari_server.create_custom_user() self.assertTrue(len(run_os_command_mock.call_args_list) == 3) self.assertEquals(result, (0, user)) run_os_command_mock.reset_mock() # Testing scenario: absent group, absent user run_os_command_mock.side_effect = [(0, "", ""), (0, "", "")] result = ambari_server.create_custom_user() self.assertTrue(len(run_os_command_mock.call_args_list) == 2) self.assertEquals(result, (0, user)) run_os_command_mock.reset_mock() # Testing scenario: os command fail run_os_command_mock.side_effect = [(1, "", "")] result = ambari_server.create_custom_user() self.assertTrue(len(run_os_command_mock.call_args_list) == 1) self.assertEquals(result, (1, None)) run_os_command_mock.reset_mock() @patch.object(ambari_server, "read_ambari_user") @patch.object(ambari_server, "get_YN_input") @patch.object(ambari_server, "create_custom_user") @patch.object(ambari_server, "write_property") @patch.object(ambari_server, "adjust_directory_permissions") @patch.object(ambari_server, "print_error_msg") def test_check_ambari_user(self, print_error_msg_mock, adjust_directory_permissions_mock, write_property_mock, create_custom_user_mock, get_YN_input_mock, read_ambari_user_mock): # Scenario: user is already defined read_ambari_user_mock.return_value = "dummy-user" result = ambari_server.check_ambari_user() self.assertFalse(get_YN_input_mock.called) self.assertFalse(write_property_mock.called) self.assertTrue(adjust_directory_permissions_mock.called) self.assertEqual(result, 0) get_YN_input_mock.reset_mock() write_property_mock.reset_mock() adjust_directory_permissions_mock.reset_mock() # Scenario: user is not defined (setup process) read_ambari_user_mock.return_value = None get_YN_input_mock.return_value = True create_custom_user_mock.return_value = (0, "dummy-user") result = ambari_server.check_ambari_user() self.assertTrue(get_YN_input_mock.called) self.assertTrue(create_custom_user_mock.called) self.assertTrue(write_property_mock.called) self.assertTrue(write_property_mock.call_args[0][1] == "dummy-user") self.assertTrue(adjust_directory_permissions_mock.called) self.assertEqual(result, 0) get_YN_input_mock.reset_mock() create_custom_user_mock.reset_mock() write_property_mock.reset_mock() adjust_directory_permissions_mock.reset_mock() # Scenario: user is not defined (setup process), user creation failed read_ambari_user_mock.return_value = None get_YN_input_mock.return_value = True create_custom_user_mock.return_value = (1, None) result = ambari_server.check_ambari_user() self.assertTrue(get_YN_input_mock.called) self.assertTrue(create_custom_user_mock.called) self.assertFalse(write_property_mock.called) self.assertFalse(adjust_directory_permissions_mock.called) self.assertEqual(result, 1) get_YN_input_mock.reset_mock() create_custom_user_mock.reset_mock() write_property_mock.reset_mock() adjust_directory_permissions_mock.reset_mock() # Scenario: user is not defined and left to be root read_ambari_user_mock.return_value = None get_YN_input_mock.return_value = False result = ambari_server.check_ambari_user() self.assertTrue(get_YN_input_mock.called) self.assertFalse(create_custom_user_mock.called) self.assertTrue(write_property_mock.called) self.assertTrue(write_property_mock.call_args[0][1] == "root") self.assertTrue(adjust_directory_permissions_mock.called) self.assertEqual(result, 0) @patch.object(ambari_server, "search_file") @patch("__builtin__.open") @patch.object(ambari_server, "read_ambari_user") @patch.object(ambari_server, "set_file_permissions") def test_store_password_file(self, set_file_permissions_mock, read_ambari_user_mock, open_mock, search_file_mock): search_file_mock.return_value = "/etc/ambari-server/conf/ambari.properties" open_mock.return_value = MagicMock() ambari_server.store_password_file("password", "passfile") self.assertTrue(set_file_permissions_mock.called) @patch.object(ambari_server, "run_os_command") def test_check_iptables(self, run_os_command_mock): run_os_command_mock.return_value = (1, "test", "") rcode, info = ambari_server.check_iptables() self.assertEqual(1, rcode) self.assertEqual("test", info) run_os_command_mock.return_value = (2, "", ambari_server.IP_TBLS_SRVC_NT_FND) rcode = ambari_server.check_iptables() self.assertEqual(0, rcode) def test_dlprogress(self): out = StringIO.StringIO() sys.stdout = out ambari_server.dlprogress("filename", 10, 2, 100) sys.stdout = sys.__stdout__ self.assertNotEqual("", out.getvalue()) @patch("urllib2.urlopen") @patch("__builtin__.open") @patch.object(ambari_server, "dlprogress") def test_track_jdk(self, dlprogress_mock, openMock, urlopenMock): u = MagicMock() u.info.return_value = {"Content-Length":"24576"} chunks = [None, "second", "first"] def side_effect(*args, **kwargs): return chunks.pop() u.read.side_effect = side_effect urlopenMock.return_value = u f = MagicMock() openMock.return_value = f ambari_server.track_jdk("base", "url", "local") self.assertEqual(0, len(chunks)) self.assertTrue(f.write.called) self.assertTrue(f.flush.called) self.assertTrue(f.close.called) self.assertEqual(2, len(dlprogress_mock.call_args_list)) @patch("shutil.copy") @patch("os.path.join") @patch("os.path.exists") @patch.object(ambari_server, "get_ambari_properties") def test_install_jce_manualy(self, get_ambari_properties_mock,\ os_path_exists_mock, os_path_join_mock,\ shutil_copy_mock): args = MagicMock() args.jce_policy = "somewhere" p = MagicMock() get_ambari_properties_mock.return_value = p p.__getitem__.side_effect = None p.__getitem__.return_value = "somewhere" os_path_exists_mock.return_value = True os_path_join_mock.return_value = \ "/var/lib/ambari-server/resources/jce_policy-6.zip" ambari_server.install_jce_manualy(args) self.assertTrue(shutil_copy_mock.called) shutil_copy_mock.side_effect = Exception("exception") try: ambari_server.install_jce_manualy(args) self.fail("Should throw exception because of not found jce_policy-6.zip") except Exception: # Expected self.assertTrue(shutil_copy_mock.called) pass shutil_copy_mock.side_effect = None args.jce_policy = None ambari_server.install_jce_manualy(args) @patch.object(ambari_server, "get_validated_string_input") @patch.object(ambari_server, "find_properties_file") @patch.object(ambari_server, "get_ambari_properties") @patch.object(ambari_server, "is_server_runing") @patch.object(ambari_server, "import_cert_and_key_action") @patch.object(ambari_server, "get_YN_input") @patch("__builtin__.open") @patch("ambari-server.Properties") def test_setup_https(self, Properties_mock, open_Mock, get_YN_input_mock,\ import_cert_and_key_action_mock, is_server_runing_mock, get_ambari_properties_mock,\ find_properties_file_mock,\ get_validated_string_input_mock): args = MagicMock() open_Mock.return_value = file p = get_ambari_properties_mock.return_value #Case #1: if client ssl is on and user didnt choose #disable ssl option and choose import certs and keys p.get_property.side_effect = ["key_dir","5555","6666", "true"] get_YN_input_mock.side_effect = [False,True] get_validated_string_input_mock.side_effect = ["4444"] get_property_expected = "[call('security.server.keys_dir'),\n"+\ " call('client.api.ssl.port'),\n"+\ " call('client.api.ssl.port'),\n call('api.ssl')]" process_pair_expected = "[call('client.api.ssl.port', '4444')]" ambari_server.SILENT = False ambari_server.setup_https(args) self.assertTrue(p.process_pair.called) self.assertTrue(p.get_property.call_count == 4) self.assertEqual(str(p.get_property.call_args_list), get_property_expected) self.assertEqual(str(p.process_pair.call_args_list), process_pair_expected) self.assertTrue(p.store.called) self.assertTrue(import_cert_and_key_action_mock.called) p.process_pair.reset_mock() p.get_property.reset_mock() p.store.reset_mock() import_cert_and_key_action_mock.reset_mock() #Case #2: if client ssl is on and user choose to disable ssl option p.get_property.side_effect = ["key_dir","", "true"] get_YN_input_mock.side_effect = [True] get_validated_string_input_mock.side_effect = ["4444"] get_property_expected = "[call('security.server.keys_dir'),\n"+\ " call('client.api.ssl.port'),\n call('api.ssl')]" process_pair_expected = "[call('api.ssl', 'false')]" ambari_server.setup_https(args) self.assertTrue(p.process_pair.called) self.assertTrue(p.get_property.call_count == 3) self.assertEqual(str(p.get_property.call_args_list), get_property_expected) self.assertEqual(str(p.process_pair.call_args_list), process_pair_expected) self.assertTrue(p.store.called) self.assertFalse(import_cert_and_key_action_mock.called) p.process_pair.reset_mock() p.get_property.reset_mock() p.store.reset_mock() import_cert_and_key_action_mock.reset_mock() #Case #3: if client ssl is off and user choose option #to import cert and keys p.get_property.side_effect = ["key_dir","", None] get_YN_input_mock.side_effect = [True, True] get_validated_string_input_mock.side_effect = ["4444"] get_property_expected = "[call('security.server.keys_dir'),\n"+\ " call('client.api.ssl.port'),\n call('api.ssl')]" process_pair_expected = "[call('client.api.ssl.port', '4444')]" ambari_server.setup_https(args) self.assertTrue(p.process_pair.called) self.assertTrue(p.get_property.call_count == 3) self.assertEqual(str(p.get_property.call_args_list), get_property_expected) self.assertEqual(str(p.process_pair.call_args_list), process_pair_expected) self.assertTrue(p.store.called) self.assertTrue(import_cert_and_key_action_mock.called) p.process_pair.reset_mock() p.get_property.reset_mock() p.store.reset_mock() import_cert_and_key_action_mock.reset_mock() #Case #4: if client ssl is off and #user did not choose option to import cert and keys p.get_property.side_effect = ["key_dir","", None] get_YN_input_mock.side_effect = [False] get_validated_string_input_mock.side_effect = ["4444"] get_property_expected = "[call('security.server.keys_dir'),\n"+\ " call('client.api.ssl.port'),\n call('api.ssl')]" process_pair_expected = "[]" ambari_server.setup_https(args) self.assertFalse(p.process_pair.called) self.assertTrue(p.get_property.call_count == 3) self.assertEqual(str(p.get_property.call_args_list), get_property_expected) self.assertEqual(str(p.process_pair.call_args_list), process_pair_expected) self.assertFalse(p.store.called) self.assertFalse(import_cert_and_key_action_mock.called) p.process_pair.reset_mock() p.get_property.reset_mock() p.store.reset_mock() import_cert_and_key_action_mock.reset_mock() ambari_server.SILENT = True @patch.object(ambari_server, "import_cert_and_key") def test_import_cert_and_key_action(self, import_cert_and_key_mock): import_cert_and_key_mock.return_value = True properties = MagicMock() properties.get_property.side_effect = ["key_dir","5555","6666", "true"] properties.process_pair = MagicMock() expect_process_pair = "[call('security.server.cert_name', 'ca.crt'),\n"+\ " call('security.server.key_name', 'ca.key'),\n"+\ " call('api.ssl', 'true')]" ambari_server.import_cert_and_key_action("key_dir", properties) self.assertEqual(str(properties.process_pair.call_args_list),\ expect_process_pair) @patch.object(ambari_server, "read_ambari_user") @patch.object(ambari_server, "set_file_permissions") @patch.object(ambari_server, "import_file_to_keystore") @patch("__builtin__.open") @patch.object(ambari_server, "run_os_command") @patch("os.path.join") @patch.object(ambari_server, "get_validated_filepath_input") @patch.object(ambari_server, "get_validated_string_input") def test_import_cert_and_key(self, get_validated_string_input_mock,\ get_validated_filepath_input_mock,\ os_path_join_mock, run_os_command_mock,\ open_mock, import_file_to_keystore_mock,\ set_file_permissions_mock, read_ambari_user_mock): get_validated_string_input_mock.return_value = "password" get_validated_filepath_input_mock.side_effect = \ ["cert_file_path","key_file_path"] os_path_join_mock.side_effect = ["cert_file_path","key_file_path",\ "keystore_cert_file_path",\ "keystore_cert_key_file_path",] run_os_command_mock.return_value = (0, "", "") om = open_mock.return_value expect_import_file_to_keystore = "[call('cert_file_path',"+\ " 'keystore_cert_file_path'),\n"+\ " call('key_file_path',"+\ " 'keystore_cert_key_file_path')]" ambari_server.import_cert_and_key("key_dir") self.assertTrue(get_validated_filepath_input_mock.call_count == 2) self.assertTrue(get_validated_string_input_mock.called) self.assertTrue(os_path_join_mock.call_count == 4) self.assertTrue(set_file_permissions_mock.call_count == 2) self.assertEqual(str(import_file_to_keystore_mock.call_args_list),\ expect_import_file_to_keystore) @patch.object(ambari_server, "run_os_command") @patch("__builtin__.open") @patch("os.path.exists") def test_is_server_runing(self, os_path_exists_mock, open_mock,\ run_os_command_mock): os_path_exists_mock.return_value = True f = open_mock.return_value f.readline.return_value = "111" run_os_command_mock.return_value = 0, "", "" status, pid = ambari_server.is_server_runing() self.assertTrue(status) self.assertEqual(111, pid) os_path_exists_mock.return_value = False status, pid = ambari_server.is_server_runing() self.assertFalse(status) @patch.object(ambari_server, "install_jce_manualy") @patch("os.stat") @patch("os.path.isfile") @patch("os.path.exists") @patch.object(ambari_server, "track_jdk") @patch.object(ambari_server, "get_YN_input") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "write_property") @patch.object(ambari_server, "print_info_msg") @patch.object(ambari_server, "get_JAVA_HOME") @patch.object(ambari_server, "get_ambari_properties") def test_download_jdk(self, get_ambari_properties_mock, get_JAVA_HOME_mock,\ print_info_msg_mock, write_property_mock,\ run_os_command_mock, get_YN_input_mock, track_jdk_mock, path_existsMock, path_isfileMock, statMock,\ install_jce_manualy_mock): args = MagicMock() args.java_home = "somewhere" path_existsMock.return_value = False get_JAVA_HOME_mock.return_value = False get_ambari_properties_mock.return_value = -1 try: ambari_server.download_jdk(args) self.fail("Should throw exception because of not found ambari.properties") except FatalException: # Expected self.assertTrue(get_ambari_properties_mock.called) pass get_JAVA_HOME_mock.return_value = True path_existsMock.return_value = True rcode = ambari_server.download_jdk(args) self.assertEqual(0, rcode) get_JAVA_HOME_mock.return_value = False rcode = ambari_server.download_jdk(args) self.assertEqual(0, rcode) self.assertTrue(write_property_mock.called) path_existsMock.return_value = False p = MagicMock() get_ambari_properties_mock.return_value = p p.__getitem__.side_effect = KeyError("test exception") try: ambari_server.download_jdk(args) self.fail("Should throw exception") except FatalException: # Expected pass p.__getitem__.return_value = "somewhere" p.__getitem__.side_effect = None path_existsMock.return_value = False run_os_command_mock.return_value = (0, "Wrong out", None) try: ambari_server.download_jdk(args) self.fail("Should throw exception") except FatalException: # Expected pass ambari_server.JDK_INSTALL_DIR = os.getcwd() get_YN_input_mock.return_value = True run_os_command_mock.return_value = (0, "Creating jdk-1.2/jre" "Content-Length: 32000\r\n" , None) statResult = MagicMock() statResult.st_size = 32000 statMock.return_value = statResult rcode = ambari_server.download_jdk(args) self.assertEqual(0, rcode) @patch.object(ambari_server, "run_os_command") def test_get_postgre_status(self, run_os_command_mock): run_os_command_mock.return_value = (1, "running", None) result = ambari_server.get_postgre_status() self.assertEqual("running", result) run_os_command_mock.return_value = (1, "wrong", None) result = ambari_server.get_postgre_status() self.assertEqual(None, result) @patch("time.sleep") @patch("subprocess.Popen") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "get_postgre_status") def test_check_postgre_up(self, get_postgre_status_mock, run_os_command_mock, popen_mock, sleep_mock): p = MagicMock() p.poll.return_value = 0 popen_mock.return_value = p run_os_command_mock.return_value = (0, None, None) rcode = ambari_server.check_postgre_up() self.assertEqual(0, rcode) p.poll.return_value = 4 get_postgre_status_mock.return_value = None rcode = ambari_server.check_postgre_up() self.assertEqual(4, rcode) @patch("platform.linux_distribution") @patch("platform.system") @patch.object(ambari_server, "print_info_msg") @patch.object(ambari_server, "print_error_msg") @patch.object(ambari_server, "get_ambari_properties") @patch.object(ambari_server, "write_property") @patch.object(ambari_server, "get_conf_dir") def test_configure_os_settings(self, get_conf_dir_mock, write_property_mock, get_ambari_properties_mock, print_error_msg_mock, print_info_msg_mock, systemMock, distMock): get_ambari_properties_mock.return_value = -1 rcode = ambari_server.configure_os_settings() self.assertEqual(-1, rcode) p = MagicMock() p[ambari_server.OS_TYPE_PROPERTY] = 'somevalue' get_ambari_properties_mock.return_value = p rcode = ambari_server.configure_os_settings() self.assertEqual(0, rcode) p.__getitem__.return_value = "" systemMock.return_value = "NonLinux" rcode = ambari_server.configure_os_settings() self.assertEqual(-1, rcode) systemMock.return_value = "Linux" distMock.return_value = ("CentOS", "6.3", None) rcode = ambari_server.configure_os_settings() self.assertEqual(0, rcode) self.assertTrue(write_property_mock.called) @patch("__builtin__.open") @patch.object(ambari_server, "Properties") @patch.object(ambari_server, "search_file") @patch.object(ambari_server, "get_conf_dir") def test_get_JAVA_HOME(self, get_conf_dir_mock, search_file_mock, Properties_mock, openMock): openMock.side_effect = Exception("exception") result = ambari_server.get_JAVA_HOME() self.assertEqual(None, result) expected = os.path.dirname(__file__) p = MagicMock() p.__getitem__.return_value = expected openMock.side_effect = None Properties_mock.return_value = p result = ambari_server.get_JAVA_HOME() self.assertEqual(expected, result) @patch("glob.glob") @patch.object(ambari_server, "get_JAVA_HOME") def test_find_jdk(self, get_JAVA_HOME_mock, globMock): get_JAVA_HOME_mock.return_value = "somewhere" result = ambari_server.find_jdk() self.assertEqual("somewhere", result) get_JAVA_HOME_mock.return_value = None globMock.return_value = [] result = ambari_server.find_jdk() self.assertEqual(None, result) globMock.return_value = ["one", "two"] result = ambari_server.find_jdk() self.assertNotEqual(None, result) @patch.object(ambari_server, "configure_os_settings") @patch.object(ambari_server, "download_jdk") @patch.object(ambari_server, "configure_postgres") @patch.object(ambari_server, "setup_db") @patch.object(ambari_server, "check_postgre_up") @patch.object(ambari_server, "check_iptables") @patch.object(ambari_server, "check_ambari_user") @patch.object(ambari_server, "check_jdbc_drivers") @patch.object(ambari_server, "check_selinux") @patch.object(ambari_server, "setup_remote_db") @patch.object(ambari_server, "store_remote_properties") @patch.object(ambari_server, "is_root") def test_setup(self, is_root_mock, store_remote_properties_mock, setup_remote_db_mock, check_selinux_mock, check_jdbc_drivers_mock, check_ambari_user_mock, check_iptables_mock, check_postgre_up_mock, setup_db_mock, configure_postgres_mock, download_jdk_mock, configure_os_settings_mock, ): args = MagicMock() # Testing call under non-root is_root_mock.return_value = False try: ambari_server.setup(args) self.fail("Should throw exception") except FatalException as fe: # Expected self.assertTrue("root-level" in fe.reason) pass # Testing calls under root is_root_mock.return_value = True check_selinux_mock.return_value = 0 check_ambari_user_mock.return_value = 0 check_jdbc_drivers_mock.return_value = 0 check_iptables_mock.return_value = (0, "other") check_postgre_up_mock.return_value = 0 setup_db_mock.return_value = 0 setup_remote_db_mock.return_value = 0 configure_postgres_mock.return_value = 0 download_jdk_mock.return_value = 0 configure_os_settings_mock.return_value = 0 store_remote_properties_mock.return_value = 0 result = ambari_server.setup(args) self.assertEqual(None, result) self.assertTrue(check_ambari_user_mock.called) @patch.object(ambari_server, "get_YN_input") @patch.object(ambari_server, "setup_db") @patch.object(ambari_server, "print_info_msg") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "configure_database_username_password") @patch.object(ambari_server, "parse_properties_file") @patch.object(ambari_server, "is_root") def test_reset(self, is_root_mock, parse_properties_file_mock, configure_database_username_password_mock, run_os_command_mock, print_info_msg_mock, setup_db_mock, get_YN_inputMock): parse_properties_file_mock.return_value = 0 args = MagicMock() args.persistence_type = "local" get_YN_inputMock.return_value = False # Testing call under non-root is_root_mock.return_value = False try: ambari_server.reset(args) self.fail("Should throw exception") except FatalException as fe: # Expected self.assertTrue("root-level" in fe.reason) pass # Testing calls under root is_root_mock.return_value = True try: ambari_server.reset(args) self.fail("Should throw exception") except FatalException as fe: # Expected self.assertFalse("root-level" in fe.reason) pass get_YN_inputMock.return_value = True run_os_command_mock.return_value = (1, None, None) try: ambari_server.reset(args) self.fail("Should throw exception") except FatalException: # Expected pass run_os_command_mock.return_value = (0, None, None) ambari_server.reset(args) self.assertTrue(setup_db_mock.called) @patch.object(ambari_server, "setup_db") @patch.object(ambari_server, "print_info_msg") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "parse_properties_file") @patch.object(ambari_server, "is_root") def test_silent_reset(self, is_root_mock, parse_properties_file_mock, run_os_command_mock, print_info_msg_mock, setup_db_mock): is_root_mock.return_value = True args = MagicMock() ambari_server.SILENT = True self.assertTrue(ambari_server.SILENT) run_os_command_mock.return_value = (0, None, None) def signal_handler(signum, frame): self.fail("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(5) rcode = ambari_server.reset(args) self.assertEqual(None, rcode) self.assertTrue(setup_db_mock.called) @patch.object(ambari_server, 'save_master_key') @patch('os.chmod', autospec=True) @patch.object(ambari_server, 'get_validated_string_input') @patch("os.environ") @patch.object(ambari_server, "get_ambari_properties") @patch("os.kill") @patch("os.path.exists") @patch("__builtin__.open") @patch("subprocess.Popen") @patch.object(ambari_server, "print_info_msg") @patch.object(ambari_server, "search_file") @patch.object(ambari_server, "find_jdk") @patch.object(ambari_server, "print_error_msg") @patch.object(ambari_server, "check_postgre_up") @patch.object(ambari_server, "check_iptables") @patch.object(ambari_server, "parse_properties_file") @patch.object(ambari_server, "read_ambari_user") @patch.object(ambari_server, "is_root") @patch("getpass.getuser") @patch("os.chdir") def test_start(self, chdir_mock, getuser_mock, is_root_mock, read_ambari_user_mock, parse_properties_file_mock, check_iptables_mock, check_postgre_up_mock, print_error_msg_mock, find_jdk_mock, search_file_mock, print_info_msg_mock, popenMock, openMock, pexistsMock, killMock, get_ambari_properties_mock, os_environ_mock, get_validated_string_input_method, os_chmod_method, save_master_key_method): args = MagicMock() f = MagicMock() f.readline.return_value = 42 openMock.return_value = f get_ambari_properties_mock.return_value = \ {ambari_server.SECURITY_KEY_IS_PERSISTED : "True"} # Checking "server is running" pexistsMock.return_value = True try: ambari_server.start(args) self.fail("Should fail with 'Server is running'") except FatalException: # Expected pass self.assertTrue(killMock.called) pexistsMock.return_value = False # Checking situation when ambari user is not set up read_ambari_user_mock.return_value = None try: ambari_server.start(args) self.fail("Should fail with 'Can not detect a system user for Ambari'") except FatalException as e: # Expected self.assertTrue('Can not detect a system user' in e.reason) # Checking start from non-root when current user is not the same as a # custom user read_ambari_user_mock.return_value = "dummy-user" getuser_mock.return_value = "non_custom_user" is_root_mock.return_value = False try: ambari_server.start(args) self.fail("Should fail with 'Can not start ambari-server as user...'") except FatalException as e: # Expected self.assertTrue('Can not start ambari-server as user' in e.reason) # Checking "jdk not found" is_root_mock.return_value = True find_jdk_mock.return_value = None try: ambari_server.start(args) self.fail("Should fail with 'No JDK found'") except FatalException as e: # Expected self.assertTrue('No JDK found' in e.reason) find_jdk_mock.return_value = "somewhere" ## Testing workflow under root is_root_mock.return_value = True # Remote DB args.persistence_type="remote" check_iptables_mock.return_value = (0, None) try: ambari_server.start(args) except FatalException as e: # Ignored pass self.assertFalse('Unable to start PostgreSQL server' in e.reason) self.assertFalse(check_postgre_up_mock.called) check_postgre_up_mock.reset_mock() # Local DB args.persistence_type="local" # case: postgres failed to start check_postgre_up_mock.return_value = 1 try: ambari_server.start(args) self.fail("Should fail with 'Unable to start PostgreSQL server'") except FatalException as e: # Expected self.assertTrue('Unable to start PostgreSQL server' in e.reason) self.assertTrue(check_postgre_up_mock.called) # case: iptables failed to stop check_postgre_up_mock.return_value = 0 check_iptables_mock.return_value = (1, ambari_server.IP_TBLS_ENABLED) try: ambari_server.start(args) self.fail("Should fail with 'Failed to stop iptables'") except FatalException as e: # Expected self.assertTrue('Failed to stop iptables' in e.reason) check_iptables_mock.return_value = (0, None) # Case: custom user is "root" read_ambari_user_mock.return_value = "root" ambari_server.start(args) self.assertTrue(popenMock.called) popen_arg = popenMock.call_args[0][0] self.assertTrue(popen_arg[0] == "/bin/sh") popenMock.reset_mock() # Case: custom user is not "root" read_ambari_user_mock.return_value = "not-root-user" ambari_server.start(args) self.assertTrue(chdir_mock.called) self.assertTrue(popenMock.called) popen_arg = popenMock.call_args[0][0] self.assertTrue(popen_arg[0] == "/bin/su") check_postgre_up_mock.reset_mock() popenMock.reset_mock() ## Testing workflow under non-root is_root_mock.return_value = False read_ambari_user_mock.return_value = "not-root-user" getuser_mock.return_value = read_ambari_user_mock.return_value # Local DB args.persistence_type="local" ambari_server.start(args) self.assertFalse(check_postgre_up_mock.called) # Remote DB args.persistence_type="remote" ambari_server.start(args) self.assertFalse(check_postgre_up_mock.called) # Checking call check_iptables_mock.reset_mock() check_iptables_mock.return_value = (0, None) ambari_server.start(args) self.assertTrue(popenMock.called) popen_arg = popenMock.call_args[0][0] self.assertTrue(popen_arg[0] == "/bin/sh") self.assertFalse(check_iptables_mock.called) # Test start under wrong user read_ambari_user_mock.return_value = "not-root-user" getuser_mock.return_value = "non_custom_user" try: ambari_server.start(args) self.fail("Can not start ambari-server as user non_custom_user.") except FatalException as e: # Expected self.assertTrue('Can not start ambari-server as user' in e.reason) # Check environ master key is set popenMock.reset_mock() get_ambari_properties_mock.return_value = \ {ambari_server.SECURITY_KEY_IS_PERSISTED : "False"} os_environ_mock.copy.return_value = {"a" : "b", ambari_server.SECURITY_KEY_ENV_VAR_NAME : "masterkey"} args.persistence_type="local" read_ambari_user_mock.return_value = "root" is_root_mock.return_value = True ambari_server.start(args) self.assertFalse(get_validated_string_input_method.called) self.assertFalse(save_master_key_method.called) popen_arg = popenMock.call_args[1]['env'] self.assertEquals(os_environ_mock.copy.return_value, popen_arg) # Check environ master key is not set popenMock.reset_mock() os_environ_mock.reset_mock() get_ambari_properties_mock.return_value = \ {ambari_server.SECURITY_KEY_IS_PERSISTED : "False"} os_environ_mock.copy.return_value = {"a" : "b"} args.persistence_type="local" read_ambari_user_mock.return_value = "root" is_root_mock.return_value = True get_validated_string_input_method.return_value = "masterkey" os_chmod_method.return_value = None ambari_server.start(args) self.assertTrue(get_validated_string_input_method.called) self.assertTrue(save_master_key_method.called) popen_arg = popenMock.call_args[1]['env'] self.assertEquals(os_environ_mock.copy.return_value, popen_arg) @patch("__builtin__.open") @patch("os.path.exists") @patch("os.remove") @patch("os.killpg") @patch("os.getpgid") @patch.object(ambari_server, "print_info_msg") def test_stop(self, print_info_msg_mock, gpidMock, removeMock, killMock, pexistsMock, openMock): pexistsMock.return_value = True f = MagicMock() f.readline.return_value = "42" openMock.return_value = f ambari_server.stop(None) self.assertTrue(f.readline.called) self.assertTrue(killMock.called) self.assertTrue(killMock.called) self.assertTrue(f.close.called) self.assertTrue(removeMock.called) @patch.object(ambari_server, "configure_database_username_password") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "is_root") def test_upgrade_stack(self, is_root_mock, run_os_command_mock, configure_postgres_username_password_mock): args = MagicMock() # Testing call under non-root is_root_mock.return_value = False try: ambari_server.upgrade_stack(args, 'HDP-2.0') self.fail("Should throw exception") except FatalException as fe: # Expected self.assertTrue("root-level" in fe.reason) pass # Testing calls under root is_root_mock.return_value = True run_os_command_mock.return_value = (0, '', '') ambari_server.upgrade_stack(args, 'HDP-2.0') self.assertTrue(configure_postgres_username_password_mock.called) self.assertTrue(run_os_command_mock.called) @patch.object(ambari_server, "adjust_directory_permissions") @patch.object(ambari_server, "print_warning_msg") @patch.object(ambari_server, "read_ambari_user") @patch.object(ambari_server, "check_db_consistency") @patch.object(ambari_server, "execute_db_script") @patch.object(ambari_server, "check_postgre_up") @patch.object(ambari_server, "update_ambari_properties") @patch.object(ambari_server, "parse_properties_file") @patch.object(ambari_server, "is_root") def test_upgrade(self, is_root_mock, parse_properties_file_mock, update_ambari_properties_mock, check_postgre_up_mock, execute_db_script_mock, check_db_consistency_mock, read_ambari_user_mock, print_warning_msg_mock, adjust_directory_permissions_mock): args = MagicMock() args.upgrade_script_file = "/var/lib/"\ "ambari-server/resources/upgrade/ddl/"\ "Ambari-DDL-Postgres-UPGRADE-1.3.0.sql" update_ambari_properties_mock.return_value = 0 check_postgre_up_mock.return_value = 0 execute_db_script_mock.return_value = 0 check_db_consistency_mock.return_value = 0 # Testing call under non-root is_root_mock.return_value = False try: ambari_server.upgrade(args) self.fail("Should throw exception") except FatalException as fe: # Expected self.assertTrue("root-level" in fe.reason) pass # Testing calls under root is_root_mock.return_value = True # Testing with undefined custom user read_ambari_user_mock.return_value = None ambari_server.upgrade(args) self.assertTrue(print_warning_msg_mock.called) warning_args = print_warning_msg_mock.call_args[0][0] self.assertTrue("custom ambari user" in warning_args) # Testing with defined custom user read_ambari_user_mock.return_value = "ambari-custom-user" ambari_server.upgrade(args) self.assertTrue(adjust_directory_permissions_mock.called) def test_print_info_msg(self): out = StringIO.StringIO() sys.stdout = out ambari_server.VERBOSE = True ambari_server.print_info_msg("msg") self.assertNotEqual("", out.getvalue()) sys.stdout = sys.__stdout__ def test_print_error_msg(self): out = StringIO.StringIO() sys.stdout = out ambari_server.VERBOSE = True ambari_server.print_error_msg("msg") self.assertNotEqual("", out.getvalue()) sys.stdout = sys.__stdout__ def test_print_warning_msg(self): out = StringIO.StringIO() sys.stdout = out ambari_server.VERBOSE = True ambari_server.print_warning_msg("msg") self.assertNotEqual("", out.getvalue()) sys.stdout = sys.__stdout__ @patch.object(ambari_server, "get_choice_string_input") def test_get_YN_input(self, get_choice_string_input_mock): ambari_server.get_YN_input("prompt", "default") self.assertTrue(get_choice_string_input_mock.called) self.assertEqual(4, len(get_choice_string_input_mock.call_args_list[0][0])) @patch.object(ambari_server, "get_conf_dir") def test_update_ambari_properties(self, get_conf_dir_mock): properties = ["server.jdbc.user.name=ambari-server\n", "server.jdbc.user.passwd=/etc/ambari-server/conf/password.dat\n", "java.home=/usr/jdk64/jdk1.6.0_31\n", "server.os_type=redhat6\n", "ambari.user=ambari\n"] NEW_PROPERTY = 'some_new_property=some_value\n' CHANGED_VALUE_PROPERTY = 'server.os_type=should_not_overwrite_value\n' get_conf_dir_mock.return_value = '/etc/ambari-server/conf' (tf1, fn1) = tempfile.mkstemp() (tf2, fn2) = tempfile.mkstemp() ambari_server.AMBARI_PROPERTIES_RPMSAVE_FILE = fn1 ambari_server.AMBARI_PROPERTIES_FILE = fn2 with open(ambari_server.AMBARI_PROPERTIES_FILE, "w") as f: f.write(NEW_PROPERTY) f.write(CHANGED_VALUE_PROPERTY) with open(ambari_server.AMBARI_PROPERTIES_RPMSAVE_FILE, 'w') as f: for line in properties: f.write(line) #Call tested method ambari_server.update_ambari_properties() timestamp = datetime.datetime.now() #RPMSAVE_FILE wasn't found self.assertFalse(os.path.exists(ambari_server.AMBARI_PROPERTIES_RPMSAVE_FILE)) #Renamed RPMSAVE_FILE exists self.assertTrue(os.path.exists(ambari_server.AMBARI_PROPERTIES_RPMSAVE_FILE + '.' + timestamp.strftime('%Y%m%d%H%M%S'))) with open(ambari_server.AMBARI_PROPERTIES_FILE, 'r') as f: ambari_properties_content = f.readlines() for line in properties: if not line in ambari_properties_content: self.fail() if not NEW_PROPERTY in ambari_properties_content: self.fail() if CHANGED_VALUE_PROPERTY in ambari_properties_content: self.fail() # Command should not fail if *.rpmsave file is missing result = ambari_server.update_ambari_properties() self.assertEquals(result, 0) os.unlink(fn2) #if ambari.properties file is absent then "ambari-server upgrade" should # fail (tf, fn) = tempfile.mkstemp() ambari_server.AMBARI_PROPERTIES_RPMSAVE_FILE = fn result = ambari_server.update_ambari_properties() self.assertNotEquals(result, 0) @patch("sys.exit") @patch.object(ambari_server, "get_db_cli_tool") @patch.object(ambari_server, "store_remote_properties") @patch.object(ambari_server, "is_local_database") @patch.object(ambari_server, "check_iptables") @patch.object(ambari_server, "check_jdbc_drivers") @patch.object(ambari_server, "is_root") @patch.object(ambari_server, "check_ambari_user") @patch.object(ambari_server, "download_jdk") @patch.object(ambari_server, "configure_os_settings") def test_setup_remote_db_wo_client(self, configure_os_settings_mock, download_jdk_mock, check_ambari_user_mock, is_root_mock, check_jdbc_drivers_mock, check_iptables_mock, is_local_db_mock, store_remote_properties_mock, get_db_cli_tool_mock, exit_mock): args = MagicMock() is_root_mock.return_value = True is_local_db_mock.return_value = False check_iptables_mock.return_value = (0, "other") store_remote_properties_mock.return_value = 0 get_db_cli_tool_mock.return_value = None check_jdbc_drivers_mock.return_value=0 check_ambari_user_mock.return_value = 0 download_jdk_mock.return_value = 0 configure_os_settings_mock.return_value = 0 try: ambari_server.setup(args) self.fail("Should throw exception") except FatalException as fe: # Expected self.assertTrue("The cli was not found" in fe.reason) @patch.object(ambari_server, "parse_properties_file") @patch.object(ambari_server, "get_db_cli_tool") @patch.object(ambari_server, "print_error_msg") @patch.object(ambari_server, "get_YN_input") @patch.object(ambari_server, "setup_db") @patch.object(ambari_server, "run_os_command") @patch.object(ambari_server, "is_root") def test_reset_remote_db_wo_client(self, is_root_mock, run_os_command_mock, setup_db_mock, get_YN_inputMock, print_error_msg_mock, get_db_cli_tool_mock, parse_properties_file_mock): args = MagicMock() get_YN_inputMock.return_value = True run_os_command_mock.return_value = (0, None, None) args.persistence_type="remote" get_db_cli_tool_mock.return_value = None is_root_mock.return_value = True try: ambari_server.reset(args) self.fail("Should throw exception") except FatalException as fe: # Expected self.assertTrue("Client wasn't found" in fe.reason) pass @patch.object(ambari_server, "get_ambari_properties") @patch.object(ambari_server, "find_jdbc_driver") @patch.object(ambari_server, "copy_files") @patch('__builtin__.raw_input') def test_check_jdbc_drivers(self, raw_input_mock, copy_files_mock, find_jdbc_driver_mock, get_ambari_properties_mock): args = MagicMock() # Check positive scenario drivers_list = ['driver_file'] resources_dir = '/tmp' get_ambari_properties_mock.return_value = {ambari_server.RESOURCES_DIR_PROPERTY : resources_dir} find_jdbc_driver_mock.return_value = drivers_list rcode = ambari_server.check_jdbc_drivers(args) self.assertEqual(0, rcode) copy_files_mock.assert_called_with(drivers_list, resources_dir) #Check negative scenario find_jdbc_driver_mock.return_value = -1 args.database = "oracle" rcode = ambari_server.check_jdbc_drivers(args) self.assertEqual(0, rcode) #Ensure user was asked to provide drivers self.assertTrue(raw_input_mock.called) @patch.object(ambari_server, "find_properties_file") def test_get_ambari_properties(self, find_properties_file_mock): find_properties_file_mock.return_value = None rcode = ambari_server.get_ambari_properties() self.assertEqual(rcode, -1) tf1 = tempfile.NamedTemporaryFile() find_properties_file_mock.return_value = tf1.name prop_name='name' prop_value='val' with open(tf1.name, 'w') as fout: fout.write(prop_name + '=' + prop_value) fout.close() properties = ambari_server.get_ambari_properties() self.assertEqual(properties[prop_name], prop_value) @patch.object(ambari_server, "get_ambari_properties") @patch.object(ambari_server, "find_jdbc_driver") @patch.object(ambari_server, "copy_files") @patch.object(ambari_server, "print_error_msg") @patch.object(ambari_server, "print_warning_msg") @patch('__builtin__.raw_input') @patch("sys.exit") def check_jdbc_drivers(self, exit_mock, raw_input_mock, print_warning_msg, print_error_msg_mock, copy_files_mock, find_jdbc_driver_mock, get_ambari_properties_mock): out = StringIO.StringIO() sys.stdout = out args = MagicMock() # Check positive scenario drivers_list = ['driver_file'] resources_dir = '/tmp' get_ambari_properties_mock.return_value = {ambari_server.RESOURCES_DIR_PROPERTY : resources_dir} find_jdbc_driver_mock.return_value = drivers_list args.database = "oracle" rcode = ambari_server.check_jdbc_drivers(args) self.assertEqual(0, rcode) copy_files_mock.assert_called_with(drivers_list, resources_dir) # Check negative scenarios # Silent option, no drivers ambari_server.SILENT = True find_jdbc_driver_mock.return_value = -1 rcode = ambari_server.check_jdbc_drivers(args) self.assertTrue(print_error_msg_mock.called) self.assertTrue(exit_mock.called) # Non-Silent option, no drivers ambari_server.SILENT = False find_jdbc_driver_mock.return_value = -1 rcode = ambari_server.check_jdbc_drivers(args) self.assertTrue(exit_mock.called) self.assertTrue(print_error_msg_mock.called) # Non-Silent option, no drivers at first ask, present drivers after that find_jdbc_driver_mock.side_effect = [-1, drivers_list] rcode = ambari_server.check_jdbc_drivers(args) self.assertEqual(0, rcode) copy_files_mock.assert_called_with(drivers_list, resources_dir) # Non-Silent option, no drivers at first ask, present drivers after that find_jdbc_driver_mock.reset() find_jdbc_driver_mock.side_effect = [-1, -1] rcode = ambari_server.check_jdbc_drivers(args) self.assertTrue(exit_mock.called) self.assertTrue(print_error_msg_mock.called) sys.stdout = sys.__stdout__ @patch.object(ambari_server, "find_properties_file") def test_get_ambari_properties(self, find_properties_file): find_properties_file.return_value = None rcode = ambari_server.get_ambari_properties() self.assertEqual(rcode, -1) tf1 = tempfile.NamedTemporaryFile() find_properties_file.return_value = tf1.name prop_name='name' prop_value='val' with open(tf1.name, 'w') as fout: fout.write(prop_name + '=' + prop_value) fout.close() properties = ambari_server.get_ambari_properties() self.assertEqual(properties[prop_name], prop_value) self.assertEqual(properties.fileName, os.path.abspath(tf1.name)) sys.stdout = sys.__stdout__ @patch.object(ambari_server, "search_file") def test_parse_properties_file(self, search_file_mock): tf1 = tempfile.NamedTemporaryFile(mode='r') search_file_mock.return_value = tf1.name args = MagicMock() ambari_server.parse_properties_file(args) self.assertEquals(args.persistence_type, "local") with open(tf1.name, 'w') as fout: fout.write("\n") fout.write(ambari_server.PERSISTENCE_TYPE_PROPERTY+"=remote") args = MagicMock() ambari_server.parse_properties_file(args) self.assertEquals(args.persistence_type, "remote") @patch.object(ambari_server, 'setup_master_key') @patch.object(ambari_server, 'read_passwd_for_alias') @patch.object(ambari_server, 'is_alias_string') @patch.object(ambari_server, 'get_ambari_properties') def test_configure_database_username_password_masterkey_persisted(self, get_ambari_properties_method, is_alias_string_method, read_passwd_for_alias_method, setup_master_key_method): out = StringIO.StringIO() sys.stdout = out configs = {ambari_server.JDBC_USER_NAME_PROPERTY: "fakeuser", ambari_server.JDBC_PASSWORD_PROPERTY: "${alias=somealias}", ambari_server.SECURITY_KEY_IS_PERSISTED: "True" } get_ambari_properties_method.return_value = configs is_alias_string_method.return_value = True read_passwd_for_alias_method.return_value = "falepasswd" args = MagicMock() ambari_server.configure_database_username_password(args) self.assertTrue(read_passwd_for_alias_method.called) self.assertTrue(is_alias_string_method.called) self.assertEquals("fakeuser", args.database_username) self.assertEquals("falepasswd", args.database_password) configs[ambari_server.SECURITY_KEY_IS_PERSISTED] = "False" get_ambari_properties_method.return_value = configs args.reset_mock() setup_master_key_method.return_value = (None, True, True) ambari_server.configure_database_username_password(args) self.assertTrue(setup_master_key_method.called) read_passwd_for_alias_method.assert_called_with( ambari_server.JDBC_RCA_PASSWORD_ALIAS, None) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'save_passwd_for_alias') @patch.object(ambari_server, 'read_password') def test_configure_database_password(self, read_password_method, save_passwd_for_alias_method): out = StringIO.StringIO() sys.stdout = out read_password_method.return_value = "fakepasswd" save_passwd_for_alias_method.return_value = 0 result = ambari_server.configure_database_password(True, None, True) self.assertTrue(save_passwd_for_alias_method.called) self.assertTrue(read_password_method.called) save_passwd_for_alias_method.assert_called_with(ambari_server .JDBC_RCA_PASSWORD_ALIAS, "fakepasswd", None) self.assertEquals(("fakepasswd", ambari_server.get_alias_string( ambari_server.JDBC_RCA_PASSWORD_ALIAS)), result) save_passwd_for_alias_method.reset_mock() result = ambari_server.configure_database_password(False, None, True) self.assertFalse(save_passwd_for_alias_method.called) self.assertEquals(("fakepasswd", None), result) save_passwd_for_alias_method.reset_mock() save_passwd_for_alias_method.return_value = -1 result = ambari_server.configure_database_password(True, None, True) self.assertEquals(("fakepasswd", None), result) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'update_properties') @patch.object(ambari_server, 'save_master_key') @patch.object(ambari_server, 'get_validated_string_input') @patch.object(ambari_server, 'get_YN_input') @patch.object(ambari_server, 'get_ambari_properties') def test_setup_master_key_persist(self, get_ambari_properties_method, get_YN_input_method, get_validated_string_input_method, save_master_key_method, update_properties_method): out = StringIO.StringIO() sys.stdout = out configs = { ambari_server.SECURITY_MASTER_KEY_LOCATION : "filepath", ambari_server.SECURITY_KEYS_DIR : tempfile.gettempdir(), ambari_server.SECURITY_KEY_IS_PERSISTED : None } get_ambari_properties_method.return_value = configs get_YN_input_method.return_value = True get_validated_string_input_method.return_value = "aaa" save_master_key_method.return_value = None update_properties_method.return_value = None ambari_server.setup_master_key(False) self.assertTrue(get_YN_input_method.called) self.assertTrue(get_validated_string_input_method.called) self.assertTrue(save_master_key_method.called) self.assertTrue(update_properties_method.called) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'update_properties') @patch.object(ambari_server, 'save_master_key') @patch.object(ambari_server, 'get_validated_string_input') @patch.object(ambari_server, 'get_YN_input') @patch.object(ambari_server, 'get_ambari_properties') def test_setup_master_key_not_persist(self, get_ambari_properties_method, get_YN_input_method, get_validated_string_input_method, save_master_key_method, update_properties_method): out = StringIO.StringIO() sys.stdout = out configs = { ambari_server.SECURITY_MASTER_KEY_LOCATION : "filepath", ambari_server.SECURITY_KEYS_DIR : tempfile.gettempdir(), ambari_server.SECURITY_KEY_IS_PERSISTED : None } get_ambari_properties_method.return_value = configs get_YN_input_method.side_effect = [True, False] get_validated_string_input_method.return_value = "aaa" save_master_key_method.return_value = None update_properties_method.return_value = None ambari_server.setup_master_key(False) self.assertTrue(get_YN_input_method.called) self.assertTrue(get_validated_string_input_method.called) self.assertTrue(update_properties_method.called) self.assertFalse(save_master_key_method.called) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'get_master_key_ispersisted') @patch.object(ambari_server, 'update_properties') @patch.object(ambari_server, 'save_master_key') @patch.object(ambari_server, 'get_validated_string_input') @patch.object(ambari_server, 'get_YN_input') @patch.object(ambari_server, 'get_ambari_properties') @patch.object(ambari_server, 'search_file') def test_setup_master_key_already_persisted(self, search_file_message, get_ambari_properties_method, get_YN_input_method, get_validated_string_input_method, save_master_key_method, update_properties_method, get_master_key_ispersisted_method): out = StringIO.StringIO() sys.stdout = out configs = { ambari_server.SECURITY_MASTER_KEY_LOCATION : "filepath", ambari_server.SECURITY_KEYS_DIR : tempfile.gettempdir(), ambari_server.SECURITY_KEY_IS_PERSISTED : "true" } get_ambari_properties_method.return_value = configs get_master_key_ispersisted_method.return_value = True search_file_message.return_value = "filepath" ambari_server.setup_master_key(False) self.assertFalse(save_master_key_method.called) self.assertFalse(get_YN_input_method.called) self.assertFalse(get_validated_string_input_method.called) self.assertFalse(update_properties_method.called) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'configure_ldap_password') @patch.object(ambari_server, 'configure_database_password') @patch.object(ambari_server, 'is_alias_string') @patch.object(ambari_server, 'get_master_key_ispersisted') @patch.object(ambari_server, 'update_properties') @patch.object(ambari_server, 'save_master_key') @patch.object(ambari_server, 'get_validated_string_input') @patch.object(ambari_server, 'get_YN_input') @patch.object(ambari_server, 'search_file') @patch.object(ambari_server, 'get_ambari_properties') def test_reset_master_key_persisted(self, get_ambari_properties_method, search_file_message, get_YN_input_method, get_validated_string_input_method, save_master_key_method, update_properties_method, get_master_key_ispersisted_method, is_alias_string_method, configure_database_password_method, configure_ldap_password_method): out = StringIO.StringIO() sys.stdout = out search_file_message.return_value = "filepath" configs = { ambari_server.SECURITY_MASTER_KEY_LOCATION : "filepath", ambari_server.SECURITY_KEYS_DIR : tempfile.gettempdir(), ambari_server.SECURITY_KEY_IS_PERSISTED : "true", ambari_server.JDBC_PASSWORD_PROPERTY : "${alias=fakealias}", ambari_server.LDAP_MGR_PASSWORD_PROPERTY : "${alias=fakealias}"} get_ambari_properties_method.return_value = configs get_master_key_ispersisted_method.return_value = True get_validated_string_input_method.return_value = "aaa" get_YN_input_method.return_value = True is_alias_string_method.return_value = True ambari_server.reset_master_key() self.assertTrue(save_master_key_method.called) self.assertTrue(get_YN_input_method.called) self.assertTrue(get_validated_string_input_method.called) self.assertTrue(update_properties_method.called) self.assertTrue(configure_database_password_method.called) self.assertTrue(configure_ldap_password_method.called) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'configure_ldap_password') @patch.object(ambari_server, 'configure_database_password') @patch.object(ambari_server, 'is_alias_string') @patch.object(ambari_server, 'get_master_key_ispersisted') @patch.object(ambari_server, 'update_properties') @patch.object(ambari_server, 'save_master_key') @patch.object(ambari_server, 'get_validated_string_input') @patch.object(ambari_server, 'get_YN_input') @patch.object(ambari_server, 'search_file') @patch.object(ambari_server, 'get_ambari_properties') def test_reset_master_key_not_persisted(self, get_ambari_properties_method, search_file_message, get_YN_input_method, get_validated_string_input_method, save_master_key_method, update_properties_method, get_master_key_ispersisted_method, is_alias_string_method, configure_database_password_method, configure_ldap_password_method): out = StringIO.StringIO() sys.stdout = out search_file_message.return_value = "filepath" configs = { ambari_server.SECURITY_MASTER_KEY_LOCATION : "filepath", ambari_server.SECURITY_KEYS_DIR : tempfile.gettempdir(), ambari_server.SECURITY_KEY_IS_PERSISTED : "false", ambari_server.JDBC_PASSWORD_PROPERTY : "${alias=fakealias}", ambari_server.LDAP_MGR_PASSWORD_PROPERTY : "${alias=fakealias}"} get_ambari_properties_method.return_value = configs get_master_key_ispersisted_method.return_value = False get_validated_string_input_method.return_value = "aaa" get_YN_input_method.return_value = False is_alias_string_method.return_value = True ambari_server.reset_master_key() self.assertFalse(save_master_key_method.called) self.assertTrue(get_YN_input_method.called) self.assertTrue(get_validated_string_input_method.called) self.assertTrue(update_properties_method.called) self.assertTrue(configure_database_password_method.called) self.assertTrue(configure_ldap_password_method.called) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'update_properties') @patch.object(ambari_server, 'configure_ldap_password') @patch.object(ambari_server, 'get_validated_string_input') @patch.object(ambari_server, 'setup_master_key') @patch.object(ambari_server, 'search_file') @patch.object(ambari_server, 'get_ambari_properties') def test_setup_ldap(self, get_ambari_properties_method, search_file_message, setup_master_key_method, get_validated_string_input_method, configure_ldap_password_method, update_properties_method): out = StringIO.StringIO() sys.stdout = out search_file_message.return_value = "filepath" configs = { ambari_server.SECURITY_MASTER_KEY_LOCATION : "filepath", ambari_server.SECURITY_KEYS_DIR : tempfile.gettempdir(), ambari_server.SECURITY_KEY_IS_PERSISTED : "true" } get_ambari_properties_method.return_value = configs get_validated_string_input_method.return_value = "test" configure_ldap_password_method.return_value = "${alias=fake}" setup_master_key_method.return_value = (None, True, True) ambari_server.setup_ldap() ldap_properties_map =\ { "authentication.ldap.primaryUrl" : "test", "authentication.ldap.secondaryUrl" : "test", "authentication.ldap.baseDn" : "test", "authentication.ldap.bindAnonymously" : "test", "authentication.ldap.usernameAttribute" : "test", "authorization.ldap.groupBase" : "test", "authorization.ldap.groupObjectClass" : "test", "authorization.ldap.groupNamingAttr" : "test", "authorization.ldap.groupMembershipAttr" : "test", "authorization.ldap.adminGroupMappingRules" : "test", "authorization.ldap.groupSearchFilter" : "test", "authorization.userRoleName" : "test", "authorization.adminRoleName" : "test", "authentication.ldap.managerDn" : "test", "authentication.ldap.managerPassword" : \ configure_ldap_password_method.return_value } self.assertEquals(update_properties_method.call_args[0][0], ldap_properties_map) self.assertTrue(update_properties_method.called) self.assertTrue(configure_ldap_password_method.called) self.assertTrue(get_validated_string_input_method.called) sys.stdout = sys.__stdout__ @patch.object(ambari_server, 'get_alias_string') @patch.object(ambari_server, 'save_passwd_for_alias') @patch.object(ambari_server, 'read_password') def test_configure_ldap_password(self, read_password_method, save_passwd_for_alias_method, get_alias_string_method): out = StringIO.StringIO() sys.stdout = out read_password_method.return_value = "blah" save_passwd_for_alias_method.return_value = 0 get_alias_string_method.return_value = "${alias=somefake}" ambari_server.configure_ldap_password(True, "aaa") self.assertTrue(save_passwd_for_alias_method.called) self.assertTrue(read_password_method.called) self.assertTrue(get_alias_string_method.called) save_passwd_for_alias_method.assert_called_once_with( ambari_server.LDAP_MGR_PASSWORD_ALIAS, "blah", "aaa") sys.stdout = sys.__stdout__ def get_sample(self, sample): """ Returns sample file content as string with normalized line endings """ path = self.get_samples_dir(sample) return self.get_file_string(path) def get_file_string(self, file): """ Returns file content as string with normalized line endings """ string = open(file, 'r').read() return self.normalize(string) def normalize(self, string): """ Normalizes line ending in string according to platform-default encoding """ return string.replace("\n", os.linesep) def get_samples_dir(self, sample): """ Returns full file path by sample name """ testdir = os.path.dirname(__file__) return os.path.dirname(testdir) + os.sep + "resources" + os.sep \ + 'TestAmbaryServer.samples/' + sample