123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- from unittest import TestCase
- import copy
- from ambari_agent.RecoveryManager import RecoveryManager
- from mock.mock import patch, MagicMock, call
- class TestRecoveryManager(TestCase):
- command = {
- "commandType": "STATUS_COMMAND",
- "payloadLevel": "EXECUTION_COMMAND",
- "componentName": "NODEMANAGER",
- "desiredState": "STARTED",
- "hasStaleConfigs": False,
- "executionCommandDetails": {
- "commandType": "EXECUTION_COMMAND",
- "roleCommand": "INSTALL",
- "role": "NODEMANAGER",
- "hostLevelParams": {
- "custom_command":""},
- "configurations": {
- "capacity-scheduler": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "capacity-calculator": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "commandParams": {
- "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
- }
- }
- }
- }
- exec_command1 = {
- "commandType": "EXECUTION_COMMAND",
- "roleCommand": "INSTALL",
- "role": "NODEMANAGER",
- "configurations": {
- "capacity-scheduler": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "capacity-calculator": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "commandParams": {
- "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
- }
- }
- }
- exec_command2 = {
- "commandType": "EXECUTION_COMMAND",
- "roleCommand": "START",
- "role": "NODEMANAGER",
- "configurations": {
- "capacity-scheduler": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "capacity-calculator": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "commandParams": {
- "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
- }
- }
- }
- exec_command3 = {
- "commandType": "EXECUTION_COMMAND",
- "roleCommand": "SERVICE_CHECK",
- "role": "NODEMANAGER",
- "configurations": {
- "capacity-scheduler": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "capacity-calculator": {
- "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
- "commandParams": {
- "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
- }
- }
- }
- def setUp(self):
- pass
- def tearDown(self):
- pass
- @patch.object(RecoveryManager, "update_desired_status")
- def test_process_commands(self, mock_uds):
- rm = RecoveryManager(True)
- rm.process_status_commands(None)
- self.assertFalse(mock_uds.called)
- rm.process_status_commands([])
- self.assertFalse(mock_uds.called)
- rm.process_status_commands([self.command])
- mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")])
- mock_uds.reset_mock()
- rm.process_status_commands([self.command, self.exec_command1, self.command])
- mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")], [call("NODEMANAGER", "STARTED")])
- mock_uds.reset_mock()
- rm.process_execution_commands([self.exec_command1, self.exec_command2, self.exec_command3])
- mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")], [call("NODEMANAGER", "STARTED")])
- mock_uds.reset_mock()
- rm.process_execution_commands([self.exec_command1, self.command])
- mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")])
- pass
- def test_defaults(self):
- rm = RecoveryManager()
- self.assertFalse(rm.enabled())
- self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
- self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
- rm.update_current_status("NODEMANAGER", "INSTALLED")
- rm.update_desired_status("NODEMANAGER", "STARTED")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- pass
- @patch.object(RecoveryManager, "_now_")
- def test_sliding_window(self, time_mock):
- time_mock.side_effect = \
- [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
- 1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
- rm = RecoveryManager(True, False)
- self.assertTrue(rm.enabled())
- rm.update_config(0, 60, 5, 12, True, False)
- self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 5, 12, True, False)
- self.assertTrue(rm.enabled())
- rm.update_config(6, 0, 5, 12, True, False)
- self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 0, 12, True, False)
- self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 1, 12, True, False)
- self.assertTrue(rm.enabled())
- rm.update_config(6, 60, 61, 12, True, False)
- self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 5, 0, True, False)
- self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 5, 4, True, False)
- self.assertFalse(rm.enabled())
- # maximum 2 in 2 minutes and at least 1 minute wait
- rm.update_config(2, 5, 1, 4, True, False)
- self.assertTrue(rm.enabled())
- # T = 1000-2
- self.assertTrue(rm.may_execute("NODEMANAGER"))
- self.assertTrue(rm.may_execute("NODEMANAGER"))
- self.assertTrue(rm.may_execute("NODEMANAGER"))
- # T = 1003-4
- self.assertTrue(rm.execute("NODEMANAGER"))
- self.assertFalse(rm.execute("NODEMANAGER")) # too soon
- # T = 1071
- self.assertTrue(rm.execute("NODEMANAGER")) # 60+ seconds passed
- # T = 1150-3
- self.assertFalse(rm.execute("NODEMANAGER")) # limit 2 exceeded
- self.assertFalse(rm.may_execute("NODEMANAGER"))
- self.assertTrue(rm.execute("DATANODE"))
- self.assertTrue(rm.may_execute("NAMENODE"))
- # T = 1400-1
- self.assertTrue(rm.execute("NODEMANAGER")) # windows reset
- self.assertFalse(rm.may_execute("NODEMANAGER")) # too soon
- # maximum 2 in 2 minutes and no min wait
- rm.update_config(2, 5, 1, 5, True, True)
- # T = 1500-3
- self.assertTrue(rm.execute("NODEMANAGER2"))
- self.assertTrue(rm.may_execute("NODEMANAGER2"))
- self.assertTrue(rm.execute("NODEMANAGER2"))
- self.assertFalse(rm.execute("NODEMANAGER2")) # max limit
- # T = 1900-2
- self.assertTrue(rm.execute("NODEMANAGER2"))
- self.assertTrue(rm.execute("NODEMANAGER2"))
- # T = 2300-2
- # lifetime max reached
- self.assertTrue(rm.execute("NODEMANAGER2"))
- self.assertFalse(rm.execute("NODEMANAGER2"))
- pass
- def test_recovery_required(self):
- rm = RecoveryManager(True, False)
- rm.update_current_status("NODEMANAGER", "INSTALLED")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- rm.update_desired_status("NODEMANAGER", "STARTED")
- self.assertTrue(rm.requires_recovery("NODEMANAGER"))
- rm.update_current_status("NODEMANAGER", "STARTED")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- self.assertTrue(rm.requires_recovery("NODEMANAGER"))
- rm.update_desired_status("NODEMANAGER", "STARTED")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- rm.update_current_status("NODEMANAGER", "INSTALLED")
- rm.update_desired_status("NODEMANAGER", "XYS")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- rm.update_desired_status("NODEMANAGER", "")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- rm.update_current_status("NODEMANAGER", "INIT")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- self.assertTrue(rm.requires_recovery("NODEMANAGER"))
- rm.update_desired_status("NODEMANAGER", "STARTED")
- self.assertTrue(rm.requires_recovery("NODEMANAGER"))
- rm = RecoveryManager(True, True)
- rm.update_current_status("NODEMANAGER", "INIT")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- rm.update_current_status("NODEMANAGER", "INIT")
- rm.update_desired_status("NODEMANAGER", "START")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- rm.update_current_status("NODEMANAGER", "INSTALLED")
- rm.update_desired_status("NODEMANAGER", "START")
- self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- pass
- @patch('time.time', MagicMock(side_effects=[1]))
- def test_store_from_status_and_use(self):
- rm = RecoveryManager(True)
- command1 = copy.deepcopy(self.command)
- rm.store_or_update_command(command1)
- self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
- install_command = rm.get_install_command("NODEMANAGER")
- start_command = rm.get_start_command("NODEMANAGER")
- self.assertEqual("INSTALL", install_command["roleCommand"])
- self.assertEqual("START", start_command["roleCommand"])
- self.assertEqual("AUTO_EXECUTION_COMMAND", install_command["commandType"])
- self.assertEqual("AUTO_EXECUTION_COMMAND", start_command["commandType"])
- self.assertEqual("NODEMANAGER", install_command["role"])
- self.assertEqual("NODEMANAGER", start_command["role"])
- self.assertEquals(install_command["configurations"], start_command["configurations"])
- self.assertEqual(2, install_command["taskId"])
- self.assertEqual(3, start_command["taskId"])
- self.assertEqual(None, rm.get_install_command("component2"))
- self.assertEqual(None, rm.get_start_command("component2"))
- self.assertTrue(rm.remove_command("NODEMANAGER"))
- self.assertFalse(rm.remove_command("NODEMANAGER"))
- self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
- self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
- self.assertEqual(None, rm.get_install_command("component2"))
- self.assertEqual(None, rm.get_start_command("component2"))
- rm.store_or_update_command(command1)
- self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
- rm.set_paused(True)
- self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
- self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
- pass
- @patch.object(RecoveryManager, "_now_")
- def test_get_recovery_commands(self, time_mock):
- time_mock.side_effect = \
- [1000, 1001, 1002, 1003,
- 1100, 1101, 1102,
- 1200, 1201, 1203,
- 4000, 4001, 4002, 4003,
- 4100, 4101, 4102, 4103,
- 4200, 4201, 4202]
- rm = RecoveryManager(True)
- rm.update_config(15, 5, 1, 16, True, False)
- command1 = copy.deepcopy(self.command)
- rm.store_or_update_command(command1)
- rm.update_current_status("NODEMANAGER", "INSTALLED")
- rm.update_desired_status("NODEMANAGER", "STARTED")
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("START", commands[0]["roleCommand"])
- rm.update_current_status("NODEMANAGER", "INIT")
- rm.update_desired_status("NODEMANAGER", "STARTED")
- # Starts at 1100
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("INSTALL", commands[0]["roleCommand"])
- rm.update_current_status("NODEMANAGER", "INIT")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- # Starts at 1200
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("INSTALL", commands[0]["roleCommand"])
- rm.update_config(2, 5, 1, 5, True, True)
- rm.update_current_status("NODEMANAGER", "INIT")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- commands = rm.get_recovery_commands()
- self.assertEqual(0, len(commands))
- rm.update_config(12, 5, 1, 15, True, False)
- rm.update_current_status("NODEMANAGER", "INIT")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- rm.store_or_update_command(command1)
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("INSTALL", commands[0]["roleCommand"])
- rm.update_config_staleness("NODEMANAGER", False)
- rm.update_current_status("NODEMANAGER", "INSTALLED")
- rm.update_desired_status("NODEMANAGER", "INSTALLED")
- commands = rm.get_recovery_commands()
- self.assertEqual(0, len(commands))
- command_install = copy.deepcopy(self.command)
- command_install["desiredState"] = "INSTALLED"
- rm.store_or_update_command(command_install)
- rm.update_config_staleness("NODEMANAGER", True)
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("INSTALL", commands[0]["roleCommand"])
- rm.update_current_status("NODEMANAGER", "STARTED")
- rm.update_desired_status("NODEMANAGER", "STARTED")
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("CUSTOM_COMMAND", commands[0]["roleCommand"])
- self.assertEqual("RESTART", commands[0]["hostLevelParams"]["custom_command"])
- pass
- @patch.object(RecoveryManager, "update_config")
- def test_update_rm_config(self, mock_uc):
- rm = RecoveryManager()
- rm.update_configuration_from_registration(None)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
- mock_uc.reset_mock()
- rm.update_configuration_from_registration({})
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
- mock_uc.reset_mock()
- rm.update_configuration_from_registration(
- {"recoveryConfig": {
- "type" : "DEFAULT"}}
- )
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
- mock_uc.reset_mock()
- rm.update_configuration_from_registration(
- {"recoveryConfig": {
- "type" : "FULL"}}
- )
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False)])
- mock_uc.reset_mock()
- rm.update_configuration_from_registration(
- {"recoveryConfig": {
- "type" : "AUTO_START",
- "max_count" : "med"}}
- )
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True)])
- mock_uc.reset_mock()
- rm.update_configuration_from_registration(
- {"recoveryConfig": {
- "type" : "AUTO_START",
- "maxCount" : "5",
- "windowInMinutes" : 20,
- "retryGap" : 2,
- "maxLifetimeCount" : 5}}
- )
- mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True)])
- pass
- @patch.object(RecoveryManager, "_now_")
- def test_recovery_report(self, time_mock):
- time_mock.side_effect = \
- [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1715]
- rm = RecoveryManager()
- rec_st = rm.get_recovery_status()
- self.assertEquals(rec_st, {"summary": "DISABLED"})
- rm.update_config(2, 5, 1, 4, True, True)
- rec_st = rm.get_recovery_status()
- self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
- rm.execute("PUMA")
- rec_st = rm.get_recovery_status()
- self.assertEquals(rec_st, {"summary": "RECOVERABLE",
- "componentReports": [{"name": "PUMA", "numAttempts": 1, "limitReached": False}]})
- rm.execute("PUMA")
- rm.execute("LION")
- rec_st = rm.get_recovery_status()
- self.assertEquals(rec_st, {"summary": "RECOVERABLE",
- "componentReports": [
- {"name": "LION", "numAttempts": 1, "limitReached": False},
- {"name": "PUMA", "numAttempts": 2, "limitReached": False}
- ]})
- rm.execute("PUMA")
- rm.execute("LION")
- rm.execute("PUMA")
- rm.execute("PUMA")
- rm.execute("LION")
- rec_st = rm.get_recovery_status()
- self.assertEquals(rec_st, {"summary": "PARTIALLY_RECOVERABLE",
- "componentReports": [
- {"name": "LION", "numAttempts": 3, "limitReached": False},
- {"name": "PUMA", "numAttempts": 4, "limitReached": True}
- ]})
- rm.execute("LION")
- rec_st = rm.get_recovery_status()
- self.assertEquals(rec_st, {"summary": "UNRECOVERABLE",
- "componentReports": [
- {"name": "LION", "numAttempts": 4, "limitReached": True},
- {"name": "PUMA", "numAttempts": 4, "limitReached": True}
- ]})
- pass
- @patch.object(RecoveryManager, "_now_")
- def test_command_expiry(self, time_mock):
- time_mock.side_effect = \
- [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
- rm = RecoveryManager(True)
- rm.update_config(5, 5, 1, 11, True, False)
- command1 = copy.deepcopy(self.command)
- rm.store_or_update_command(command1)
- rm.update_current_status("NODEMANAGER", "INSTALLED")
- rm.update_desired_status("NODEMANAGER", "STARTED")
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("START", commands[0]["roleCommand"])
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("START", commands[0]["roleCommand"])
- #1807 command is stale
- commands = rm.get_recovery_commands()
- self.assertEqual(0, len(commands))
- rm.store_or_update_command(command1)
- commands = rm.get_recovery_commands()
- self.assertEqual(1, len(commands))
- self.assertEqual("START", commands[0]["roleCommand"])
- pass
- def test_command_count(self):
- rm = RecoveryManager(True)
- self.assertFalse(rm.has_active_command())
- rm.start_execution_command()
- self.assertTrue(rm.has_active_command())
- rm.start_execution_command()
- self.assertTrue(rm.has_active_command())
- rm.stop_execution_command()
- self.assertTrue(rm.has_active_command())
- rm.stop_execution_command()
- self.assertFalse(rm.has_active_command())
|