|
@@ -31,8 +31,56 @@ from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
-
|
|
|
class StatusCommandsExecutor(object):
|
|
|
+ def put_commands(self, commands):
|
|
|
+ raise NotImplemented()
|
|
|
+
|
|
|
+ def process_results(self):
|
|
|
+ raise NotImplemented()
|
|
|
+
|
|
|
+ def relaunch(self, reason=None):
|
|
|
+ raise NotImplemented()
|
|
|
+
|
|
|
+ def kill(self, reason=None, can_relaunch=True):
|
|
|
+ raise NotImplemented()
|
|
|
+
|
|
|
+class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
|
|
|
+ def __init__(self, config, actionQueue):
|
|
|
+ self.config = config
|
|
|
+ self.actionQueue = actionQueue
|
|
|
+ self.statusCommandQueue = Queue.Queue()
|
|
|
+ self.need_relaunch = False
|
|
|
+
|
|
|
+ def put_commands(self, commands):
|
|
|
+ while not self.statusCommandQueue.empty():
|
|
|
+ self.statusCommandQueue.get()
|
|
|
+
|
|
|
+ for command in commands:
|
|
|
+ logger.info("Adding " + command['commandType'] + " for component " + \
|
|
|
+ command['componentName'] + " of service " + \
|
|
|
+ command['serviceName'] + " of cluster " + \
|
|
|
+ command['clusterName'] + " to the queue.")
|
|
|
+ self.statusCommandQueue.put(command)
|
|
|
+ logger.debug(pprint.pformat(command))
|
|
|
+
|
|
|
+ def process_results(self):
|
|
|
+ """
|
|
|
+ Execute a single command from the queue and process it
|
|
|
+ """
|
|
|
+ while not self.statusCommandQueue.empty():
|
|
|
+ try:
|
|
|
+ command = self.statusCommandQueue.get(False)
|
|
|
+ self.actionQueue.process_status_command_result(self.actionQueue.execute_status_command_and_security_status(command))
|
|
|
+ except Queue.Empty:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def relaunch(self, reason=None):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def kill(self, reason=None, can_relaunch=True):
|
|
|
+ pass
|
|
|
+
|
|
|
+class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
|
|
|
def __init__(self, config, actionQueue):
|
|
|
self.config = config
|
|
|
self.actionQueue = actionQueue
|
|
@@ -78,7 +126,7 @@ class StatusCommandsExecutor(object):
|
|
|
result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
|
|
|
self.mp_result_logs.put((level, result_message, exception))
|
|
|
|
|
|
- def get_log_messages(self):
|
|
|
+ def _get_log_messages(self):
|
|
|
"""
|
|
|
Returns list of (level, message, exception) log messages.
|
|
|
|
|
@@ -100,11 +148,11 @@ class StatusCommandsExecutor(object):
|
|
|
pass
|
|
|
return results
|
|
|
|
|
|
- def process_logs(self):
|
|
|
+ def _process_logs(self):
|
|
|
"""
|
|
|
Get all available at this moment logs and prints them to logger.
|
|
|
"""
|
|
|
- for level, message, exception in self.get_log_messages():
|
|
|
+ for level, message, exception in self._get_log_messages():
|
|
|
if level == logging.ERROR:
|
|
|
logger.debug(message, exc_info=exception)
|
|
|
if level == logging.WARN:
|
|
@@ -129,9 +177,7 @@ class StatusCommandsExecutor(object):
|
|
|
"""
|
|
|
while True:
|
|
|
_cmd = internal_in_queue.get()
|
|
|
- component_status_result = self.customServiceOrchestrator.requestComponentStatus(_cmd)
|
|
|
- component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(_cmd)
|
|
|
- internal_out_queue.put((_cmd, component_status_result, component_security_status_result))
|
|
|
+ internal_out_queue.put(self.actionQueue.execute_status_command_and_security_status(_cmd))
|
|
|
|
|
|
worker = threading.Thread(target=_internal_worker)
|
|
|
worker.daemon = True
|
|
@@ -219,7 +265,18 @@ class StatusCommandsExecutor(object):
|
|
|
self.mp_task_queue.put(command)
|
|
|
logger.debug(pprint.pformat(command))
|
|
|
|
|
|
- def get_results(self):
|
|
|
+ def process_results(self):
|
|
|
+ """
|
|
|
+ Process all the results from the internal worker
|
|
|
+ """
|
|
|
+ self._process_logs()
|
|
|
+ for result in self._get_results():
|
|
|
+ try:
|
|
|
+ self.actionQueue.process_status_command_result(result)
|
|
|
+ except UnicodeDecodeError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def _get_results(self):
|
|
|
"""
|
|
|
Get all available results for status commands.
|
|
|
|
|
@@ -289,7 +346,7 @@ class StatusCommandsExecutor(object):
|
|
|
else:
|
|
|
# get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
|
|
|
# this call will do nothing, as all logs will be processed in ActionQueue loop
|
|
|
- self.process_logs()
|
|
|
+ self._process_logs()
|
|
|
logger.info("Child process died gracefully")
|
|
|
else:
|
|
|
logger.info("Child process already dead")
|