|
@@ -163,6 +163,28 @@ class TestActionQueue(TestCase):
|
|
|
'hostLevelParams': {}
|
|
|
}
|
|
|
|
|
|
+ retryable_command = {
|
|
|
+ 'commandType': 'EXECUTION_COMMAND',
|
|
|
+ 'role': 'NAMENODE',
|
|
|
+ 'roleCommand': 'INSTALL',
|
|
|
+ 'commandId': '1-1',
|
|
|
+ 'taskId': 19,
|
|
|
+ 'clusterName': 'c1',
|
|
|
+ 'serviceName': 'HDFS',
|
|
|
+ 'configurations':{'global' : {}},
|
|
|
+ 'configurationTags':{'global' : { 'tag': 'v123' }},
|
|
|
+ 'commandParams' : {
|
|
|
+ 'script_type' : 'PYTHON',
|
|
|
+ 'script' : 'script.py',
|
|
|
+ 'command_timeout' : '600',
|
|
|
+ 'jdk_location' : '.',
|
|
|
+ 'service_package_folder' : '.',
|
|
|
+ 'command_retry_enabled' : 'true',
|
|
|
+ 'command_retry_max_attempt_count' : '3'
|
|
|
+ },
|
|
|
+ 'hostLevelParams' : {}
|
|
|
+ }
|
|
|
+
|
|
|
background_command = {
|
|
|
'commandType': 'BACKGROUND_EXECUTION_COMMAND',
|
|
|
'role': 'NAMENODE',
|
|
@@ -310,7 +332,8 @@ class TestActionQueue(TestCase):
|
|
|
'stderr': 'stderr',
|
|
|
'structuredOut' : ''
|
|
|
}
|
|
|
- def side_effect(command, tmpoutfile, tmperrfile):
|
|
|
+
|
|
|
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
|
|
|
unfreeze_flag.wait()
|
|
|
return python_execution_result_dict
|
|
|
def patched_aq_execute_command(command):
|
|
@@ -635,6 +658,109 @@ class TestActionQueue(TestCase):
|
|
|
actionQueue.join()
|
|
|
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
|
|
|
|
|
|
+
|
|
|
+ @patch("time.sleep")
|
|
|
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
|
|
|
+ @patch.object(StackVersionsFileHandler, "read_stack_version")
|
|
|
+ @patch.object(CustomServiceOrchestrator, "__init__")
|
|
|
+ def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
|
|
|
+ read_stack_version_mock, sleep_mock
|
|
|
+ ):
|
|
|
+ CustomServiceOrchestrator_mock.return_value = None
|
|
|
+ dummy_controller = MagicMock()
|
|
|
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ python_execution_result_dict = {
|
|
|
+ 'exitcode': 1,
|
|
|
+ 'stdout': 'out',
|
|
|
+ 'stderr': 'stderr',
|
|
|
+ 'structuredOut': '',
|
|
|
+ 'status': 'FAILED'
|
|
|
+ }
|
|
|
+
|
|
|
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
|
|
|
+ return python_execution_result_dict
|
|
|
+
|
|
|
+ command = copy.deepcopy(self.retryable_command)
|
|
|
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
|
|
|
+ runCommand_mock.side_effect = side_effect
|
|
|
+ actionQueue.execute_command(command)
|
|
|
+
|
|
|
+ #assert that python executor start
|
|
|
+ self.assertTrue(runCommand_mock.called)
|
|
|
+ self.assertEqual(3, runCommand_mock.call_count)
|
|
|
+ self.assertEqual(2, sleep_mock.call_count)
|
|
|
+ sleep_mock.assert_has_calls([call(2), call(4)], False)
|
|
|
+ runCommand_mock.assert_has_calls([
|
|
|
+ call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=True, retry=False),
|
|
|
+ call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=False, retry=True),
|
|
|
+ call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=False, retry=True)])
|
|
|
+
|
|
|
+
|
|
|
+ #retryable_command
|
|
|
+ @patch("time.sleep")
|
|
|
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
|
|
|
+ @patch.object(StackVersionsFileHandler, "read_stack_version")
|
|
|
+ @patch.object(CustomServiceOrchestrator, "__init__")
|
|
|
+ def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
|
|
|
+ read_stack_version_mock, sleep_mock
|
|
|
+ ):
|
|
|
+ CustomServiceOrchestrator_mock.return_value = None
|
|
|
+ dummy_controller = MagicMock()
|
|
|
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ execution_result_fail_dict = {
|
|
|
+ 'exitcode': 1,
|
|
|
+ 'stdout': 'out',
|
|
|
+ 'stderr': 'stderr',
|
|
|
+ 'structuredOut': '',
|
|
|
+ 'status': 'FAILED'
|
|
|
+ }
|
|
|
+ execution_result_succ_dict = {
|
|
|
+ 'exitcode': 0,
|
|
|
+ 'stdout': 'out',
|
|
|
+ 'stderr': 'stderr',
|
|
|
+ 'structuredOut': '',
|
|
|
+ 'status': 'COMPLETED'
|
|
|
+ }
|
|
|
+
|
|
|
+ command = copy.deepcopy(self.retryable_command)
|
|
|
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
|
|
|
+ runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
|
|
|
+ actionQueue.execute_command(command)
|
|
|
+
|
|
|
+ #assert that python executor start
|
|
|
+ self.assertTrue(runCommand_mock.called)
|
|
|
+ self.assertEqual(2, runCommand_mock.call_count)
|
|
|
+ self.assertEqual(1, sleep_mock.call_count)
|
|
|
+ sleep_mock.assert_any_call(2)
|
|
|
+
|
|
|
+ @patch("time.sleep")
|
|
|
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
|
|
|
+ @patch.object(StackVersionsFileHandler, "read_stack_version")
|
|
|
+ @patch.object(CustomServiceOrchestrator, "__init__")
|
|
|
+ def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock,
|
|
|
+ read_stack_version_mock, sleep_mock
|
|
|
+ ):
|
|
|
+ CustomServiceOrchestrator_mock.return_value = None
|
|
|
+ dummy_controller = MagicMock()
|
|
|
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ execution_result_succ_dict = {
|
|
|
+ 'exitcode': 0,
|
|
|
+ 'stdout': 'out',
|
|
|
+ 'stderr': 'stderr',
|
|
|
+ 'structuredOut': '',
|
|
|
+ 'status': 'COMPLETED'
|
|
|
+ }
|
|
|
+
|
|
|
+ command = copy.deepcopy(self.retryable_command)
|
|
|
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
|
|
|
+ runCommand_mock.side_effect = [execution_result_succ_dict]
|
|
|
+ actionQueue.execute_command(command)
|
|
|
+
|
|
|
+ #assert that python executor start
|
|
|
+ self.assertTrue(runCommand_mock.called)
|
|
|
+ self.assertFalse(sleep_mock.called)
|
|
|
+ self.assertEqual(1, runCommand_mock.call_count)
|
|
|
+
|
|
|
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
|
|
|
@patch.object(StackVersionsFileHandler, "read_stack_version")
|
|
|
@patch.object(CustomServiceOrchestrator, "runCommand")
|