|
@@ -228,14 +228,17 @@ class TestActionQueue(TestCase):
|
|
|
}
|
|
|
|
|
|
|
|
|
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
|
|
|
@patch.object(ActionQueue, "process_command")
|
|
|
@patch.object(Queue, "get")
|
|
|
@patch.object(CustomServiceOrchestrator, "__init__")
|
|
|
def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
|
|
|
- get_mock, process_command_mock):
|
|
|
+ get_mock, process_command_mock, get_parallel_exec_option_mock):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
config = MagicMock()
|
|
|
+ get_parallel_exec_option_mock.return_value = 0
|
|
|
+ config.get_parallel_exec_option = get_parallel_exec_option_mock
|
|
|
actionQueue = ActionQueue(config, dummy_controller)
|
|
|
actionQueue.start()
|
|
|
time.sleep(0.1)
|
|
@@ -481,7 +484,7 @@ class TestActionQueue(TestCase):
|
|
|
}
|
|
|
cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
|
|
|
|
|
|
- config = AmbariConfig().getConfig()
|
|
|
+ config = AmbariConfig()
|
|
|
tempdir = tempfile.gettempdir()
|
|
|
config.set('agent', 'prefix', tempdir)
|
|
|
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
|
|
@@ -523,7 +526,7 @@ class TestActionQueue(TestCase):
|
|
|
}
|
|
|
cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
|
|
|
|
|
|
- config = AmbariConfig().getConfig()
|
|
|
+ config = AmbariConfig()
|
|
|
tempdir = tempfile.gettempdir()
|
|
|
config.set('agent', 'prefix', tempdir)
|
|
|
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
|
|
@@ -564,7 +567,7 @@ class TestActionQueue(TestCase):
|
|
|
status_update_callback):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
|
|
|
|
|
|
build_mock.return_value = {'dummy report': '' }
|
|
|
|
|
@@ -600,7 +603,7 @@ class TestActionQueue(TestCase):
|
|
|
status_update_callback):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
|
|
|
|
|
|
|
|
|
requestComponentStatus_mock.reset_mock()
|
|
@@ -620,14 +623,17 @@ class TestActionQueue(TestCase):
|
|
|
self.assertEqual(len(report['componentStatus']), 1)
|
|
|
self.assertTrue(report['componentStatus'][0].has_key('alerts'))
|
|
|
|
|
|
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
|
|
|
@patch.object(ActionQueue, "process_command")
|
|
|
@patch.object(Queue, "get")
|
|
|
@patch.object(CustomServiceOrchestrator, "__init__")
|
|
|
def test_reset_queue(self, CustomServiceOrchestrator_mock,
|
|
|
- get_mock, process_command_mock):
|
|
|
+ get_mock, process_command_mock, gpeo_mock):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
config = MagicMock()
|
|
|
+ gpeo_mock.return_value = 0
|
|
|
+ config.get_parallel_exec_option = gpeo_mock
|
|
|
actionQueue = ActionQueue(config, dummy_controller)
|
|
|
actionQueue.start()
|
|
|
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
|
|
@@ -639,14 +645,17 @@ class TestActionQueue(TestCase):
|
|
|
actionQueue.join()
|
|
|
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
|
|
|
|
|
|
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
|
|
|
@patch.object(ActionQueue, "process_command")
|
|
|
@patch.object(Queue, "get")
|
|
|
@patch.object(CustomServiceOrchestrator, "__init__")
|
|
|
def test_cancel(self, CustomServiceOrchestrator_mock,
|
|
|
- get_mock, process_command_mock):
|
|
|
+ get_mock, process_command_mock, gpeo_mock):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
config = MagicMock()
|
|
|
+ gpeo_mock.return_value = 0
|
|
|
+ config.get_parallel_exec_option = gpeo_mock
|
|
|
actionQueue = ActionQueue(config, dummy_controller)
|
|
|
actionQueue.start()
|
|
|
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
|
|
@@ -658,6 +667,27 @@ class TestActionQueue(TestCase):
|
|
|
actionQueue.join()
|
|
|
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
|
|
|
|
|
|
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
|
|
|
+ @patch.object(ActionQueue, "process_command")
|
|
|
+ @patch.object(CustomServiceOrchestrator, "__init__")
|
|
|
+ def test_parallel_exec(self, CustomServiceOrchestrator_mock,
|
|
|
+ process_command_mock, gpeo_mock):
|
|
|
+ CustomServiceOrchestrator_mock.return_value = None
|
|
|
+ dummy_controller = MagicMock()
|
|
|
+ config = MagicMock()
|
|
|
+ gpeo_mock.return_value = 1
|
|
|
+ config.get_parallel_exec_option = gpeo_mock
|
|
|
+ actionQueue = ActionQueue(config, dummy_controller)
|
|
|
+ actionQueue.start()
|
|
|
+ actionQueue.put([self.datanode_install_command, self.hbase_install_command])
|
|
|
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
|
|
|
+ time.sleep(1)
|
|
|
+ actionQueue.stop()
|
|
|
+ actionQueue.join()
|
|
|
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
|
|
|
+ self.assertEqual(2, process_command_mock.call_count)
|
|
|
+ process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
|
|
|
+
|
|
|
|
|
|
@patch("time.sleep")
|
|
|
@patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
|
|
@@ -668,7 +698,7 @@ class TestActionQueue(TestCase):
|
|
|
):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
|
|
|
python_execution_result_dict = {
|
|
|
'exitcode': 1,
|
|
|
'stdout': 'out',
|
|
@@ -706,7 +736,7 @@ class TestActionQueue(TestCase):
|
|
|
):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
|
|
|
execution_result_fail_dict = {
|
|
|
'exitcode': 1,
|
|
|
'stdout': 'out',
|
|
@@ -742,7 +772,7 @@ class TestActionQueue(TestCase):
|
|
|
):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
|
|
|
execution_result_succ_dict = {
|
|
|
'exitcode': 0,
|
|
|
'stdout': 'out',
|
|
@@ -774,7 +804,7 @@ class TestActionQueue(TestCase):
|
|
|
'stderr' : 'err-13'}
|
|
|
|
|
|
dummy_controller = MagicMock()
|
|
|
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
|
|
|
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
|
|
|
|
|
|
execute_command = copy.deepcopy(self.background_command)
|
|
|
actionQueue.put([execute_command])
|
|
@@ -791,39 +821,44 @@ class TestActionQueue(TestCase):
|
|
|
self.assertEqual(len(report['reports']),1)
|
|
|
|
|
|
@not_for_platform(PLATFORM_WINDOWS)
|
|
|
+ @patch.object(CustomServiceOrchestrator, "get_py_executor")
|
|
|
@patch.object(CustomServiceOrchestrator, "resolve_script_path")
|
|
|
@patch.object(StackVersionsFileHandler, "read_stack_version")
|
|
|
- def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock):
|
|
|
+ def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock,
|
|
|
+ get_py_executor_mock):
|
|
|
|
|
|
dummy_controller = MagicMock()
|
|
|
- cfg = AmbariConfig().getConfig()
|
|
|
+ cfg = AmbariConfig()
|
|
|
cfg.set('agent', 'tolerate_download_failures', 'true')
|
|
|
cfg.set('agent', 'prefix', '.')
|
|
|
cfg.set('agent', 'cache_dir', 'background_tasks')
|
|
|
|
|
|
actionQueue = ActionQueue(cfg, dummy_controller)
|
|
|
- patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
|
|
|
+ pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
|
|
|
+ patch_output_file(pyex)
|
|
|
+ get_py_executor_mock.return_value = pyex
|
|
|
actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
|
|
|
|
|
|
result = {}
|
|
|
lock = threading.RLock()
|
|
|
complete_done = threading.Condition(lock)
|
|
|
|
|
|
- def command_complete_w(process_condenced_result, handle):
|
|
|
+ def command_complete_w(process_condensed_result, handle):
|
|
|
with lock:
|
|
|
- result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result),
|
|
|
+ result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
|
|
|
'handle' : copy.copy(handle),
|
|
|
'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
|
|
|
}
|
|
|
complete_done.notifyAll()
|
|
|
-
|
|
|
- actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
|
|
|
+
|
|
|
+ actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
|
|
|
+ None, command_complete_w)
|
|
|
actionQueue.put([self.background_command])
|
|
|
actionQueue.processBackgroundQueueSafeEmpty();
|
|
|
actionQueue.processStatusCommandQueueSafeEmpty();
|
|
|
|
|
|
with lock:
|
|
|
- complete_done.wait(.1)
|
|
|
+ complete_done.wait(0.1)
|
|
|
|
|
|
finished_status = result['command_complete']['command_status']
|
|
|
self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
|