|
@@ -67,6 +67,19 @@ class TestActionQueue(TestCase):
|
|
|
'configurationTags':{'global' : { 'tag': 'v1' }}
|
|
|
}
|
|
|
|
|
|
+ datanode_auto_start_command = {
|
|
|
+ 'commandType': 'AUTO_EXECUTION_COMMAND',
|
|
|
+ 'role': u'DATANODE',
|
|
|
+ 'roleCommand': u'START',
|
|
|
+ 'commandId': '1-1',
|
|
|
+ 'taskId': 3,
|
|
|
+ 'clusterName': u'cc',
|
|
|
+ 'serviceName': u'HDFS',
|
|
|
+ 'hostLevelParams': {},
|
|
|
+ 'configurations':{'global' : {}},
|
|
|
+ 'configurationTags':{'global' : { 'tag': 'v1' }}
|
|
|
+ }
|
|
|
+
|
|
|
datanode_upgrade_command = {
|
|
|
'commandId': 17,
|
|
|
'role' : "role",
|
|
@@ -308,6 +321,85 @@ class TestActionQueue(TestCase):
|
|
|
actionQueue.process_command(execution_command)
|
|
|
self.assertTrue(print_exc_mock.called)
|
|
|
|
|
|
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
|
|
|
+ @patch("__builtin__.open")
|
|
|
+ @patch.object(ActionQueue, "status_update_callback")
|
|
|
+ def test_auto_execute_command(self, status_update_callback_mock, open_mock):
|
|
|
+ # Make file read calls visible
|
|
|
+ def open_side_effect(file, mode):
|
|
|
+ if mode == 'r':
|
|
|
+ file_mock = MagicMock()
|
|
|
+ file_mock.read.return_value = "Read from " + str(file)
|
|
|
+ return file_mock
|
|
|
+ else:
|
|
|
+ return self.original_open(file, mode)
|
|
|
+ open_mock.side_effect = open_side_effect
|
|
|
+
|
|
|
+ config = AmbariConfig()
|
|
|
+ tempdir = tempfile.gettempdir()
|
|
|
+ config.set('agent', 'prefix', tempdir)
|
|
|
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
|
|
|
+ config.set('agent', 'tolerate_download_failures', "true")
|
|
|
+ dummy_controller = MagicMock()
|
|
|
+ dummy_controller.recovery_manager = RecoveryManager()
|
|
|
+ dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False)
|
|
|
+
|
|
|
+ actionQueue = ActionQueue(config, dummy_controller)
|
|
|
+ unfreeze_flag = threading.Event()
|
|
|
+ python_execution_result_dict = {
|
|
|
+ 'stdout': 'out',
|
|
|
+ 'stderr': 'stderr',
|
|
|
+ 'structuredOut' : ''
|
|
|
+ }
|
|
|
+
|
|
|
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
|
|
|
+ unfreeze_flag.wait()
|
|
|
+ return python_execution_result_dict
|
|
|
+ def patched_aq_execute_command(command):
|
|
|
+ # We have to perform patching for separate thread in the same thread
|
|
|
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
|
|
|
+ runCommand_mock.side_effect = side_effect
|
|
|
+ actionQueue.process_command(command)
|
|
|
+
|
|
|
+ python_execution_result_dict['status'] = 'COMPLETE'
|
|
|
+ python_execution_result_dict['exitcode'] = 0
|
|
|
+ self.assertFalse(actionQueue.tasks_in_progress_or_pending())
|
|
|
+ # We call method in a separate thread
|
|
|
+ execution_thread = Thread(target = patched_aq_execute_command ,
|
|
|
+ args = (self.datanode_auto_start_command, ))
|
|
|
+ execution_thread.start()
|
|
|
+ # check in progress report
|
|
|
+ # wait until ready
|
|
|
+ while True:
|
|
|
+ time.sleep(0.1)
|
|
|
+ if actionQueue.tasks_in_progress_or_pending():
|
|
|
+ break
|
|
|
+ # Continue command execution
|
|
|
+ unfreeze_flag.set()
|
|
|
+ # wait until ready
|
|
|
+ while actionQueue.tasks_in_progress_or_pending():
|
|
|
+ time.sleep(0.1)
|
|
|
+ report = actionQueue.result()
|
|
|
+
|
|
|
+ self.assertEqual(len(report['reports']), 0)
|
|
|
+
|
|
|
+ ## Test failed execution
|
|
|
+ python_execution_result_dict['status'] = 'FAILED'
|
|
|
+ python_execution_result_dict['exitcode'] = 13
|
|
|
+ # We call method in a separate thread
|
|
|
+ execution_thread = Thread(target = patched_aq_execute_command ,
|
|
|
+ args = (self.datanode_auto_start_command, ))
|
|
|
+ execution_thread.start()
|
|
|
+ unfreeze_flag.set()
|
|
|
+ # check in progress report
|
|
|
+ # wait until ready
|
|
|
+ report = actionQueue.result()
|
|
|
+ while actionQueue.tasks_in_progress_or_pending():
|
|
|
+ time.sleep(0.1)
|
|
|
+ report = actionQueue.result()
|
|
|
+
|
|
|
+ self.assertEqual(len(report['reports']), 0)
|
|
|
+
|
|
|
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
|
|
|
@patch("__builtin__.open")
|
|
|
@patch.object(ActionQueue, "status_update_callback")
|
|
@@ -371,7 +463,9 @@ class TestActionQueue(TestCase):
|
|
|
'taskId': 3,
|
|
|
'exitCode': 777}
|
|
|
self.assertEqual(report['reports'][0], expected)
|
|
|
- # Continue command execution
|
|
|
+ self.assertTrue(actionQueue.tasks_in_progress_or_pending())
|
|
|
+
|
|
|
+ # Continue command execution
|
|
|
unfreeze_flag.set()
|
|
|
# wait until ready
|
|
|
while report['reports'][0]['status'] == 'IN_PROGRESS':
|
|
@@ -631,6 +725,7 @@ class TestActionQueue(TestCase):
|
|
|
get_mock, process_command_mock, gpeo_mock):
|
|
|
CustomServiceOrchestrator_mock.return_value = None
|
|
|
dummy_controller = MagicMock()
|
|
|
+ dummy_controller.recovery_manager = RecoveryManager()
|
|
|
config = MagicMock()
|
|
|
gpeo_mock.return_value = 0
|
|
|
config.get_parallel_exec_option = gpeo_mock
|
|
@@ -638,8 +733,10 @@ class TestActionQueue(TestCase):
|
|
|
actionQueue.start()
|
|
|
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
|
|
|
self.assertEqual(2, actionQueue.commandQueue.qsize())
|
|
|
+ self.assertTrue(actionQueue.tasks_in_progress_or_pending())
|
|
|
actionQueue.reset()
|
|
|
self.assertTrue(actionQueue.commandQueue.empty())
|
|
|
+ self.assertFalse(actionQueue.tasks_in_progress_or_pending())
|
|
|
time.sleep(0.1)
|
|
|
actionQueue.stop()
|
|
|
actionQueue.join()
|