|
@@ -27,6 +27,7 @@ import os, errno, time, pprint, tempfile, threading
|
|
|
import sys
|
|
|
from threading import Thread
|
|
|
import copy
|
|
|
+import signal
|
|
|
|
|
|
from mock.mock import patch, MagicMock, call
|
|
|
from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
|
|
@@ -703,6 +704,53 @@ class TestActionQueue(TestCase):
|
|
|
report = actionQueue.result()
|
|
|
self.assertEqual(len(report['reports']), 0)
|
|
|
|
|
|
+ def test_cancel_with_reschedule_command(self):
|
|
|
+ config = AmbariConfig()
|
|
|
+ tempdir = tempfile.gettempdir()
|
|
|
+ config.set('agent', 'prefix', tempdir)
|
|
|
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
|
|
|
+ config.set('agent', 'tolerate_download_failures', "true")
|
|
|
+ dummy_controller = MagicMock()
|
|
|
+ actionQueue = ActionQueue(config, dummy_controller)
|
|
|
+ unfreeze_flag = threading.Event()
|
|
|
+ python_execution_result_dict = {
|
|
|
+ 'stdout': 'out',
|
|
|
+ 'stderr': 'stderr',
|
|
|
+ 'structuredOut' : '',
|
|
|
+ 'status' : '',
|
|
|
+ 'exitcode' : -signal.SIGTERM
|
|
|
+ }
|
|
|
+
|
|
|
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
|
|
|
+ unfreeze_flag.wait()
|
|
|
+ return python_execution_result_dict
|
|
|
+ def patched_aq_execute_command(command):
|
|
|
+ # We have to perform patching for separate thread in the same thread
|
|
|
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
|
|
|
+ runCommand_mock.side_effect = side_effect
|
|
|
+ actionQueue.execute_command(command)
|
|
|
+
|
|
|
+ # We call method in a separate thread
|
|
|
+ execution_thread = Thread(target = patched_aq_execute_command ,
|
|
|
+ args = (self.datanode_install_command, ))
|
|
|
+ execution_thread.start()
|
|
|
+ # check in progress report
|
|
|
+ # wait until ready
|
|
|
+ while True:
|
|
|
+ time.sleep(0.1)
|
|
|
+ report = actionQueue.result()
|
|
|
+ if len(report['reports']) != 0:
|
|
|
+ break
|
|
|
+
|
|
|
+ unfreeze_flag.set()
|
|
|
+ # wait until ready
|
|
|
+ while len(report['reports']) != 0:
|
|
|
+ time.sleep(0.1)
|
|
|
+ report = actionQueue.result()
|
|
|
+
|
|
|
+ # check report
|
|
|
+ self.assertEqual(len(report['reports']), 0)
|
|
|
+
|
|
|
|
|
|
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
|
|
|
@patch.object(CustomServiceOrchestrator, "runCommand")
|