Forráskód Böngészése

AMBARI-6768 Add ability to an agent to cancel queued/running tasks (dsen)

Dmytro Sen 10 éve
szülő
commit
5bcd375294

+ 33 - 0
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -108,6 +108,32 @@ class ActionQueue(threading.Thread):
       logger.debug(pprint.pformat(command))
       logger.debug(pprint.pformat(command))
       self.commandQueue.put(command)
       self.commandQueue.put(command)
 
 
+  def cancel(self, commands):
+    for command in commands:
+
+      logger.info("Canceling command {tid}".format(tid = str(command['target_task_id'])))
+      logger.debug(pprint.pformat(command))
+
+      task_id = command['target_task_id']
+      reason = command['reason']
+
+      # Remove from the command queue by task_id
+      queue = self.commandQueue
+      self.commandQueue = Queue.Queue()
+
+      while not queue.empty():
+        queued_command = queue.get(False)
+        if queued_command['task_id'] != task_id:
+          self.commandQueue.put(queued_command)
+        else:
+          logger.info("Canceling " + queued_command['commandType'] + \
+                      " for service " + queued_command['serviceName'] + \
+                      " of cluster " +  queued_command['clusterName'] + \
+                      " to the queue.")
+
+    # Kill if in progress
+    self.customServiceOrchestrator.cancel_command(task_id, reason)
+
   def run(self):
   def run(self):
     while not self.stopped():
     while not self.stopped():
       while  not self.statusCommandQueue.empty():
       while  not self.statusCommandQueue.empty():
@@ -287,3 +313,10 @@ class ActionQueue(threading.Thread):
     Actions that are executed every time when command status changes
     Actions that are executed every time when command status changes
     """
     """
     self.controller.heartbeat_wait_event.set()
     self.controller.heartbeat_wait_event.set()
+
+  # Removes all commands from the queue
+  def reset(self):
+    queue = self.commandQueue
+    with queue.mutex:
+      queue.queue.clear()
+

+ 16 - 2
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -78,7 +78,7 @@ class Controller(threading.Thread):
   def __del__(self):
   def __del__(self):
     logger.info("Server connection disconnected.")
     logger.info("Server connection disconnected.")
     pass
     pass
-
+  
   def registerWithServer(self):
   def registerWithServer(self):
     LiveStatus.SERVICES = []
     LiveStatus.SERVICES = []
     LiveStatus.CLIENT_COMPONENTS = []
     LiveStatus.CLIENT_COMPONENTS = []
@@ -142,7 +142,12 @@ class Controller(threading.Thread):
       pass
       pass
     return ret
     return ret
 
 
-
+  def cancelCommandInQueue(self, commands):
+    """ Remove from the queue commands, kill the process if it's in progress """
+    if commands:
+      self.actionQueue.cancel(commands)
+    pass
+  
   def addToQueue(self, commands):
   def addToQueue(self, commands):
     """Add to the queue for running the commands """
     """Add to the queue for running the commands """
     """ Put the required actions into the Queue """
     """ Put the required actions into the Queue """
@@ -223,6 +228,10 @@ class Controller(threading.Thread):
         else:
         else:
           self.responseId=serverId
           self.responseId=serverId
 
 
+        if 'cancelCommands' in response.keys():
+          self.cancelCommandInQueue(response['cancelCommands'])
+          pass
+
         if 'executionCommands' in response.keys():
         if 'executionCommands' in response.keys():
           self.addToQueue(response['executionCommands'])
           self.addToQueue(response['executionCommands'])
           pass
           pass
@@ -309,6 +318,11 @@ class Controller(threading.Thread):
     logger.info("Registration response from %s was %s", self.serverHostname, message)
     logger.info("Registration response from %s was %s", self.serverHostname, message)
 
 
     if self.isRegistered:
     if self.isRegistered:
+      # Clearing command queue to stop executing "stale" commands
+      # after registration
+      logger.info('Resetting ActionQueue...')
+      self.actionQueue.reset()
+
       # Process callbacks
       # Process callbacks
       for callback in self.registration_listeners:
       for callback in self.registration_listeners:
         callback()
         callback()

+ 27 - 1
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -22,6 +22,7 @@ import logging
 import os
 import os
 import json
 import json
 import sys
 import sys
+import shell
 
 
 from FileCache import FileCache
 from FileCache import FileCache
 from AgentException import AgentException
 from AgentException import AgentException
@@ -68,7 +69,19 @@ class CustomServiceOrchestrator():
       os.unlink(self.status_commands_stderr)
       os.unlink(self.status_commands_stderr)
     except OSError:
     except OSError:
       pass # Ignore fail
       pass # Ignore fail
+    self.commands_in_progress = {}
 
 
+  def map_task_to_process(self, task_id, processId):
+    self.commands_in_progress[task_id] = processId
+
+  def cancel_command(self, task_id, reason):
+    if task_id in self.commands_in_progress.keys():
+      pid = self.commands_in_progress.get(task_id)
+      self.commands_in_progress[task_id] = reason
+      logger.info("Canceling command with task_id - {tid}, " \
+                  "reason - {reason} . Killing process {pid}"
+      .format(tid = str(task_id), reason = reason, pid = pid))
+      shell.kill_process_with_children(pid)
 
 
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None,
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None,
                  override_output_files = True):
                  override_output_files = True):
@@ -132,7 +145,8 @@ class CustomServiceOrchestrator():
         script_params = [command_name, json_path, current_base_dir]
         script_params = [command_name, json_path, current_base_dir]
         ret = self.python_executor.run_file(py_file, script_params,
         ret = self.python_executor.run_file(py_file, script_params,
                                self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
                                self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
-                               tmpstrucoutfile, logger_level, override_output_files)
+                               tmpstrucoutfile, logger_level, self.map_task_to_process,
+                               task_id, override_output_files)
         # Next run_file() invocations should always append to current output
         # Next run_file() invocations should always append to current output
         override_output_files = False
         override_output_files = False
         if ret['exitcode'] != 0:
         if ret['exitcode'] != 0:
@@ -141,6 +155,18 @@ class CustomServiceOrchestrator():
       if not ret: # Something went wrong
       if not ret: # Something went wrong
         raise AgentException("No script has been executed")
         raise AgentException("No script has been executed")
 
 
+      # if canceled
+      pid = self.commands_in_progress.pop(task_id)
+      if not isinstance(pid, int):
+        reason = '\nCommand aborted. ' + pid
+        ret['stdout'] += reason
+        ret['stderr'] += reason
+
+        with open(tmpoutfile, "a") as f:
+          f.write(reason)
+        with open(tmperrfile, "a") as f:
+          f.write(reason)
+
     except Exception: # We do not want to let agent fail completely
     except Exception: # We do not want to let agent fail completely
       exc_type, exc_obj, exc_tb = sys.exc_info()
       exc_type, exc_obj, exc_tb = sys.exc_info()
       message = "Catched an exception while executing "\
       message = "Catched an exception while executing "\

+ 5 - 2
ambari-agent/src/main/python/ambari_agent/PythonExecutor.py

@@ -47,8 +47,9 @@ class PythonExecutor:
     self.config = config
     self.config = config
     pass
     pass
 
 
-  def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile, timeout,
-               tmpstructedoutfile, logger_level, override_output_files = True):
+  def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
+               timeout, tmpstructedoutfile, logger_level, callback, task_id,
+               override_output_files = True):
     """
     """
     Executes the specified python file in a separate subprocess.
     Executes the specified python file in a separate subprocess.
     Method returns only when the subprocess is finished.
     Method returns only when the subprocess is finished.
@@ -77,6 +78,8 @@ class PythonExecutor:
     pythonCommand = self.python_command(script, script_params)
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
     logger.info("Running command " + pprint.pformat(pythonCommand))
     process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
     process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
+    # map task_id to pid
+    callback(task_id, process.pid)
     logger.debug("Launching watchdog thread")
     logger.debug("Launching watchdog thread")
     self.event.clear()
     self.event.clear()
     self.python_process_has_been_killed = False
     self.python_process_has_been_killed = False

+ 39 - 0
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -487,3 +487,42 @@ class TestActionQueue(TestCase):
     self.assertTrue(requestComponentStatus_mock.called)
     self.assertTrue(requestComponentStatus_mock.called)
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertTrue(report['componentStatus'][0].has_key('alerts'))
     self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_reset_queue(self, CustomServiceOrchestrator_mock,
+                                get_mock, process_command_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.reset()
+    self.assertTrue(actionQueue.commandQueue.empty())
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_cancel(self, CustomServiceOrchestrator_mock,
+                       get_mock, process_command_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.reset()
+    self.assertTrue(actionQueue.commandQueue.empty())
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+

+ 6 - 0
ambari-agent/src/test/python/ambari_agent/TestController.py

@@ -236,6 +236,8 @@ class TestController(unittest.TestCase):
     self.controller.registerWithServer = registerWithServer
     self.controller.registerWithServer = registerWithServer
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     self.controller.heartbeatWithServer = heartbeatWithServer
     self.controller.heartbeatWithServer = heartbeatWithServer
+    actionQueue = MagicMock(name="actionQueue")
+    self.controller.actionQueue = actionQueue
 
 
     Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
     Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
 
 
@@ -257,6 +259,8 @@ class TestController(unittest.TestCase):
     self.controller.registerWithServer = registerWithServer
     self.controller.registerWithServer = registerWithServer
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     self.controller.heartbeatWithServer = heartbeatWithServer
     self.controller.heartbeatWithServer = heartbeatWithServer
+    actionQueue = MagicMock(name="actionQueue")
+    self.controller.actionQueue = actionQueue
 
 
     listener1 = MagicMock()
     listener1 = MagicMock()
     listener2 = MagicMock()
     listener2 = MagicMock()
@@ -282,6 +286,8 @@ class TestController(unittest.TestCase):
     self.controller.registerWithServer = registerWithServer
     self.controller.registerWithServer = registerWithServer
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     self.controller.heartbeatWithServer = heartbeatWithServer
     self.controller.heartbeatWithServer = heartbeatWithServer
+    actionQueue = MagicMock(name="actionQueue")
+    self.controller.actionQueue = actionQueue
 
 
     self.controller.isRegistered = True
     self.controller.isRegistered = True
     self.controller.registerAndHeartbeat()
     self.controller.registerAndHeartbeat()

+ 81 - 3
ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py

@@ -18,9 +18,11 @@ See the License for the specific language governing permissions and
 limitations under the License.
 limitations under the License.
 '''
 '''
 import ConfigParser
 import ConfigParser
+from multiprocessing.pool import ThreadPool
 import os
 import os
 
 
 import pprint
 import pprint
+import shell
 
 
 from unittest import TestCase
 from unittest import TestCase
 import threading
 import threading
@@ -185,6 +187,8 @@ class TestCustomServiceOrchestrator(TestCase):
        '/hooks_dir/prefix-command')
        '/hooks_dir/prefix-command')
     dummy_controller = MagicMock()
     dummy_controller = MagicMock()
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+    unix_process_id = 111
+    orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
     get_hook_base_dir_mock.return_value = "/hooks/"
     get_hook_base_dir_mock.return_value = "/hooks/"
     # normal run case
     # normal run case
     run_file_mock.return_value = {
     run_file_mock.return_value = {
@@ -208,9 +212,9 @@ class TestCustomServiceOrchestrator(TestCase):
     ret = orchestrator.runCommand(command, "out.txt", "err.txt",
     ret = orchestrator.runCommand(command, "out.txt", "err.txt",
               forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
               forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
     ## Check that override_output_files was true only during first call
     ## Check that override_output_files was true only during first call
-    self.assertEquals(run_file_mock.call_args_list[0][0][8], True)
-    self.assertEquals(run_file_mock.call_args_list[1][0][8], False)
-    self.assertEquals(run_file_mock.call_args_list[2][0][8], False)
+    self.assertEquals(run_file_mock.call_args_list[0][0][10], True)
+    self.assertEquals(run_file_mock.call_args_list[1][0][10], False)
+    self.assertEquals(run_file_mock.call_args_list[2][0][10], False)
     ## Check that forced_command_name was taken into account
     ## Check that forced_command_name was taken into account
     self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
     self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
                                   CustomServiceOrchestrator.COMMAND_NAME_STATUS)
                                   CustomServiceOrchestrator.COMMAND_NAME_STATUS)
@@ -229,6 +233,78 @@ class TestCustomServiceOrchestrator(TestCase):
 
 
     pass
     pass
 
 
+  @patch("shell.kill_process_with_children")
+  @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+  @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
+  @patch.object(FileCache, "get_service_base_dir")
+  @patch.object(FileCache, "get_hook_base_dir")
+  @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
+  @patch.object(PythonExecutor, "run_file")
+  @patch.object(FileCache, "__init__")
+  def test_cancel_command(self, FileCache_mock,
+                      run_file_mock, dump_command_to_json_mock,
+                      get_hook_base_dir_mock, get_service_base_dir_mock,
+                      resolve_hook_script_path_mock, resolve_script_path_mock,
+                      kill_process_with_children_mock):
+    FileCache_mock.return_value = None
+    command = {
+      'role' : 'REGION_SERVER',
+      'hostLevelParams' : {
+        'stack_name' : 'HDP',
+        'stack_version' : '2.0.7',
+        'jdk_location' : 'some_location'
+      },
+      'commandParams': {
+        'script_type': 'PYTHON',
+        'script': 'scripts/hbase_regionserver.py',
+        'command_timeout': '600',
+        'service_package_folder' : 'HBASE'
+      },
+      'taskId' : '3',
+      'roleCommand': 'INSTALL'
+    }
+    get_service_base_dir_mock.return_value = "/basedir/"
+    resolve_script_path_mock.return_value = "/basedir/scriptpath"
+    resolve_hook_script_path_mock.return_value = \
+      ('/hooks_dir/prefix-command/scripts/hook.py',
+       '/hooks_dir/prefix-command')
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+    unix_process_id = 111
+    orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
+    get_hook_base_dir_mock.return_value = "/hooks/"
+    run_file_mock_return_value = {
+      'stdout' : 'killed',
+      'stderr' : 'killed',
+      'exitcode': 1,
+      }
+    def side_effect(*args, **kwargs):
+      time.sleep(0.2)
+      return run_file_mock_return_value
+    run_file_mock.side_effect = side_effect
+
+    _, out = tempfile.mkstemp()
+    _, err = tempfile.mkstemp()
+    pool = ThreadPool(processes=1)
+    async_result = pool.apply_async(orchestrator.runCommand, (command, out, err))
+
+    time.sleep(0.1)
+    orchestrator.cancel_command(command['taskId'], 'reason')
+
+    ret = async_result.get()
+
+    self.assertEqual(ret['exitcode'], 1)
+    self.assertEquals(ret['stdout'], 'killed\nCommand aborted. reason')
+    self.assertEquals(ret['stderr'], 'killed\nCommand aborted. reason')
+
+    self.assertTrue(kill_process_with_children_mock.called)
+    self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys())
+    self.assertTrue(os.path.exists(out))
+    self.assertTrue(os.path.exists(err))
+    os.remove(out)
+    os.remove(err)
+
+
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(PythonExecutor, "run_file")
   @patch.object(PythonExecutor, "run_file")
   @patch.object(FileCache, "__init__")
   @patch.object(FileCache, "__init__")
@@ -252,6 +328,8 @@ class TestCustomServiceOrchestrator(TestCase):
     }
     }
     dummy_controller = MagicMock()
     dummy_controller = MagicMock()
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+    unix_process_id = 111
+    orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
     # normal run case
     # normal run case
     run_file_mock.return_value = {
     run_file_mock.return_value = {
       'stdout' : 'sss',
       'stdout' : 'sss',

+ 13 - 5
ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py

@@ -55,12 +55,15 @@ class TestPythonExecutor(TestCase):
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = None
     subproc_mock.returncode = None
+    callback_method = MagicMock()
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
-      ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstrucout,"INFO"))
+      ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile,
+      PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO", callback_method, '1'))
     thread.start()
     thread.start()
     time.sleep(0.1)
     time.sleep(0.1)
     subproc_mock.finished_event.wait()
     subproc_mock.finished_event.wait()
     self.assertEquals(subproc_mock.was_terminated, True, "Subprocess should be terminated due to timeout")
     self.assertEquals(subproc_mock.was_terminated, True, "Subprocess should be terminated due to timeout")
+    self.assertTrue(callback_method.called)
 
 
 
 
   def test_watchdog_2(self):
   def test_watchdog_2(self):
@@ -83,16 +86,18 @@ class TestPythonExecutor(TestCase):
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
     subproc_mock.returncode = 0
+    callback_method = MagicMock()
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
                                                       "/fake_tmp_dir", tmpoutfile, tmperrfile,
                                                       "/fake_tmp_dir", tmpoutfile, tmperrfile,
-                                                      PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO"))
+                                                      PYTHON_TIMEOUT_SECONDS, tmpstrucout,
+                                                      "INFO", callback_method, "1-1"))
     thread.start()
     thread.start()
     time.sleep(0.1)
     time.sleep(0.1)
     subproc_mock.should_finish_event.set()
     subproc_mock.should_finish_event.set()
     subproc_mock.finished_event.wait()
     subproc_mock.finished_event.wait()
     self.assertEquals(subproc_mock.was_terminated, False, "Subprocess should not be terminated before timeout")
     self.assertEquals(subproc_mock.was_terminated, False, "Subprocess should not be terminated before timeout")
     self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
     self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
-
+    self.assertTrue(callback_method.called)
 
 
   def test_execution_results(self):
   def test_execution_results(self):
     subproc_mock = self.Subprocess_mockup()
     subproc_mock = self.Subprocess_mockup()
@@ -112,10 +117,13 @@ class TestPythonExecutor(TestCase):
     executor.runShellKillPgrp = runShellKillPgrp_method
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
     subproc_mock.returncode = 0
     subproc_mock.should_finish_event.set()
     subproc_mock.should_finish_event.set()
-    result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstroutfile, "INFO")
+    callback_method = MagicMock()
+    result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir",
+                               tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
+                               tmpstroutfile, "INFO", callback_method, "1-1")
     self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
     self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
                                'structuredOut': {}})
                                'structuredOut': {}})
-
+    self.assertTrue(callback_method.called)
 
 
   def test_is_successfull(self):
   def test_is_successfull(self):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())