|
@@ -83,280 +83,4 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
|
|
|
def kill(self, reason=None, can_relaunch=True):
|
|
|
pass
|
|
|
|
|
|
-class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
|
|
|
- def __init__(self, config, actionQueue):
|
|
|
- self.config = config
|
|
|
- self.actionQueue = actionQueue
|
|
|
-
|
|
|
- self.can_relaunch = True
|
|
|
-
|
|
|
- # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of
|
|
|
- # old and new queues
|
|
|
- self.usage_lock = threading.RLock()
|
|
|
- # protects against simultaneous killing/creating from different threads.
|
|
|
- self.kill_lock = threading.RLock()
|
|
|
-
|
|
|
- self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
|
|
|
- self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
|
|
|
-
|
|
|
- self.worker_process = None
|
|
|
- self.mustDieEvent = multiprocessing.Event()
|
|
|
- self.timedOutEvent = multiprocessing.Event()
|
|
|
-
|
|
|
- # multiprocessing stuff that need to be cleaned every time
|
|
|
- self.mp_result_queue = multiprocessing.Queue()
|
|
|
- self.mp_result_logs = multiprocessing.Queue()
|
|
|
- self.mp_task_queue = multiprocessing.Queue()
|
|
|
-
|
|
|
- def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001):
|
|
|
- """
|
|
|
- Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains
|
|
|
- extremely dumb protection against blocking too much at this method: will try to get all possible items for not more
|
|
|
- than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised
|
|
|
- ``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read
|
|
|
- ~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too fast.
|
|
|
-
|
|
|
- :param target_queue: queue to read from
|
|
|
- :param max_time: maximum time to spend in this method call
|
|
|
- :param max_empty_count: maximum allowed ``Queue.Empty`` in a row
|
|
|
- :param read_break: time to wait before next read cycle iteration
|
|
|
- :return: list of resulting objects
|
|
|
- """
|
|
|
- results = []
|
|
|
- _empty = 0
|
|
|
- _start = time.time()
|
|
|
- with self.usage_lock:
|
|
|
- try:
|
|
|
- while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count:
|
|
|
- try:
|
|
|
- results.append(target_queue.get(False))
|
|
|
- _empty = 0
|
|
|
- time.sleep(read_break) # sleep a little to get more accurate empty and qsize results
|
|
|
- except Queue.Empty:
|
|
|
- _empty += 1
|
|
|
- except IOError:
|
|
|
- pass
|
|
|
- except UnicodeDecodeError:
|
|
|
- pass
|
|
|
- except IOError:
|
|
|
- pass
|
|
|
- return results
|
|
|
-
|
|
|
- def _log_message(self, level, message, exception=None):
|
|
|
- """
|
|
|
- Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
|
|
|
-
|
|
|
- :param level:
|
|
|
- :param message:
|
|
|
- :param exception:
|
|
|
- :return:
|
|
|
- """
|
|
|
- result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
|
|
|
- self.mp_result_logs.put((level, result_message, exception))
|
|
|
-
|
|
|
- def _process_logs(self):
|
|
|
- """
|
|
|
- Get all available at this moment logs and prints them to logger.
|
|
|
- """
|
|
|
- for level, message, exception in self._drain_queue(self.mp_result_logs):
|
|
|
- if level == logging.ERROR:
|
|
|
- logger.debug(message, exc_info=exception)
|
|
|
- if level == logging.WARN:
|
|
|
- logger.warn(message)
|
|
|
- if level == logging.INFO:
|
|
|
- logger.info(message)
|
|
|
-
|
|
|
- def _worker_process_target(self):
|
|
|
- """
|
|
|
- Internal method that running in separate process.
|
|
|
- """
|
|
|
- # cleanup monkey-patching results in child process, as it causing problems
|
|
|
- import subprocess
|
|
|
- reload(subprocess)
|
|
|
- import multiprocessing
|
|
|
- reload(multiprocessing)
|
|
|
-
|
|
|
- bind_debug_signal_handlers()
|
|
|
- self._log_message(logging.INFO, "StatusCommandsExecutor process started")
|
|
|
-
|
|
|
- # region StatusCommandsExecutor process internals
|
|
|
- internal_in_queue = Queue.Queue()
|
|
|
- internal_out_queue = Queue.Queue()
|
|
|
-
|
|
|
- def _internal_worker():
|
|
|
- """
|
|
|
- thread that actually executes status commands
|
|
|
- """
|
|
|
- while True:
|
|
|
- _cmd = internal_in_queue.get()
|
|
|
- internal_out_queue.put(self.actionQueue.execute_status_command_and_security_status(_cmd))
|
|
|
-
|
|
|
- worker = threading.Thread(target=_internal_worker)
|
|
|
- worker.daemon = True
|
|
|
- worker.start()
|
|
|
-
|
|
|
- def _internal_process_command(_command):
|
|
|
- internal_in_queue.put(_command)
|
|
|
- start_time = time.time()
|
|
|
- result = None
|
|
|
- while not self.mustDieEvent.is_set() and not result and time.time() - start_time < self.status_command_timeout:
|
|
|
- try:
|
|
|
- result = internal_out_queue.get(timeout=1)
|
|
|
- except Queue.Empty:
|
|
|
- pass
|
|
|
-
|
|
|
- if result:
|
|
|
- self.mp_result_queue.put(result)
|
|
|
- return True
|
|
|
- else:
|
|
|
- # do not set timed out event twice
|
|
|
- if not self.timedOutEvent.is_set():
|
|
|
- self._set_timed_out(_command)
|
|
|
- return False
|
|
|
-
|
|
|
- # endregion
|
|
|
-
|
|
|
- try:
|
|
|
- while not self.mustDieEvent.is_set():
|
|
|
- try:
|
|
|
- command = self.mp_task_queue.get(False)
|
|
|
- except Queue.Empty:
|
|
|
- # no command, lets try in other loop iteration
|
|
|
- time.sleep(.1)
|
|
|
- continue
|
|
|
-
|
|
|
- self._log_message(logging.DEBUG, "Running status command for {0}".format(command['componentName']))
|
|
|
-
|
|
|
- if _internal_process_command(command):
|
|
|
- self._log_message(logging.DEBUG, "Completed status command for {0}".format(command['componentName']))
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:", e)
|
|
|
- raise
|
|
|
-
|
|
|
- self._log_message(logging.INFO, "StatusCommandsExecutor subprocess finished")
|
|
|
-
|
|
|
- def _set_timed_out(self, command):
|
|
|
- """
|
|
|
- Set timeout event and adding log entry for given command.
|
|
|
-
|
|
|
- :param command:
|
|
|
- :return:
|
|
|
- """
|
|
|
- msg = "Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format(
|
|
|
- command['commandType'],
|
|
|
- command['componentName'],
|
|
|
- self.status_command_timeout
|
|
|
- )
|
|
|
- self._log_message(logging.WARN, msg)
|
|
|
- self.timedOutEvent.set()
|
|
|
-
|
|
|
- def put_commands(self, commands):
|
|
|
- """
|
|
|
- Put given commands to command executor.
|
|
|
-
|
|
|
- :param commands: status commands to execute
|
|
|
- :return:
|
|
|
- """
|
|
|
- with self.usage_lock:
|
|
|
- for command in commands:
|
|
|
- logger.info("Adding " + command['commandType'] + " for component " + \
|
|
|
- command['componentName'] + " of service " + \
|
|
|
- command['serviceName'] + " of cluster " + \
|
|
|
- command['clusterName'] + " to the queue.")
|
|
|
- self.mp_task_queue.put(command)
|
|
|
- logger.debug(pprint.pformat(command))
|
|
|
-
|
|
|
- def process_results(self):
|
|
|
- """
|
|
|
- Process all the results from the SCE worker process.
|
|
|
- """
|
|
|
- self._process_logs()
|
|
|
- results = self._drain_queue(self.mp_result_queue)
|
|
|
- logger.debug("Drained %s status commands results, ~%s remains in queue", len(results), self.mp_result_queue.qsize())
|
|
|
- for result in results:
|
|
|
- try:
|
|
|
- self.actionQueue.process_status_command_result(result)
|
|
|
- except UnicodeDecodeError:
|
|
|
- pass
|
|
|
-
|
|
|
- @property
|
|
|
- def need_relaunch(self):
|
|
|
- """
|
|
|
- Indicates if process need to be relaunched due to timeout or it is dead or even was not created.
|
|
|
-
|
|
|
- :return: tuple (bool, str|None) with flag to relaunch and reason of relaunch
|
|
|
- """
|
|
|
- if not self.worker_process or not self.worker_process.is_alive():
|
|
|
- return True, "WORKER_DEAD"
|
|
|
- elif self.timedOutEvent.is_set():
|
|
|
- return True, "COMMAND_TIMEOUT"
|
|
|
- return False, None
|
|
|
-
|
|
|
- def relaunch(self, reason=None):
|
|
|
- """
|
|
|
- Restart status command executor internal process.
|
|
|
-
|
|
|
- :param reason: reason of restart
|
|
|
- :return:
|
|
|
- """
|
|
|
- with self.kill_lock:
|
|
|
- logger.info("Relaunching child process reason:" + str(reason))
|
|
|
- if self.can_relaunch:
|
|
|
- self.kill(reason)
|
|
|
- self.worker_process = multiprocessing.Process(target=self._worker_process_target)
|
|
|
- self.worker_process.start()
|
|
|
- logger.info("Started process with pid {0}".format(self.worker_process.pid))
|
|
|
- else:
|
|
|
- logger.debug("Relaunch does not allowed, can not relaunch")
|
|
|
-
|
|
|
- def kill(self, reason=None, can_relaunch=True):
|
|
|
- """
|
|
|
- Tries to stop command executor internal process for sort time, otherwise killing it. Closing all possible queues to
|
|
|
- unblock threads that probably blocked on read or write operations to queues. Must be called from threads different
|
|
|
- from threads that calling read or write methods(get_log_messages, get_results, put_commands).
|
|
|
-
|
|
|
- :param can_relaunch: indicates if StatusCommandsExecutor can be relaunched after this kill
|
|
|
- :param reason: reason of killing
|
|
|
- :return:
|
|
|
- """
|
|
|
- with self.kill_lock:
|
|
|
- self.can_relaunch = can_relaunch
|
|
|
-
|
|
|
- if not self.can_relaunch:
|
|
|
- logger.info("Killing without possibility to relaunch...")
|
|
|
-
|
|
|
- # try graceful stop, otherwise hard-kill
|
|
|
- if self.worker_process and self.worker_process.is_alive():
|
|
|
- self.mustDieEvent.set()
|
|
|
- self.worker_process.join(timeout=3)
|
|
|
- if self.worker_process.is_alive():
|
|
|
- os.kill(self.worker_process.pid, signal.SIGKILL)
|
|
|
- logger.info("Child process killed by -9")
|
|
|
- else:
|
|
|
- # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
|
|
|
- # this call will do nothing, as all logs will be processed in ActionQueue loop
|
|
|
- self._process_logs()
|
|
|
- logger.info("Child process died gracefully")
|
|
|
- else:
|
|
|
- logger.info("Child process already dead")
|
|
|
-
|
|
|
- # close queues and acquire usage lock
|
|
|
- # closing both sides of pipes here, we need this hack in case of blocking on recv() call
|
|
|
- self.mp_result_queue.close()
|
|
|
- self.mp_result_queue._writer.close()
|
|
|
- self.mp_result_logs.close()
|
|
|
- self.mp_result_logs._writer.close()
|
|
|
- self.mp_task_queue.close()
|
|
|
- self.mp_task_queue._writer.close()
|
|
|
-
|
|
|
- with self.usage_lock:
|
|
|
- self.mp_result_queue.join_thread()
|
|
|
- self.mp_result_queue = multiprocessing.Queue()
|
|
|
- self.mp_task_queue.join_thread()
|
|
|
- self.mp_task_queue = multiprocessing.Queue()
|
|
|
- self.mp_result_logs.join_thread()
|
|
|
- self.mp_result_logs = multiprocessing.Queue()
|
|
|
- self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
|
|
|
- self.mustDieEvent.clear()
|
|
|
- self.timedOutEvent.clear()
|
|
|
+MultiProcessStatusCommandsExecutor = SingleProcessStatusCommandsExecutor
|