|
@@ -36,6 +36,7 @@ from CommandStatusDict import CommandStatusDict
|
|
|
from CustomServiceOrchestrator import CustomServiceOrchestrator
|
|
|
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
|
|
|
from ambari_commons.str_utils import split_on_chunks
|
|
|
+from ambari_commons.thread_utils import terminate_thread
|
|
|
|
|
|
|
|
|
logger = logging.getLogger()
|
|
@@ -83,7 +84,6 @@ class ActionQueue(threading.Thread):
|
|
|
self.controller = controller
|
|
|
self.configTags = {}
|
|
|
self._stop = threading.Event()
|
|
|
- self.hangingStatusCommands = {}
|
|
|
self.tmpdir = config.get('agent', 'prefix')
|
|
|
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
|
|
|
self.parallel_execution = config.get_parallel_exec_option()
|
|
@@ -230,22 +230,16 @@ class ActionQueue(threading.Thread):
|
|
|
elif commandType == self.STATUS_COMMAND:
|
|
|
component_name = command['componentName']
|
|
|
|
|
|
- if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive():
|
|
|
- del self.hangingStatusCommands[component_name]
|
|
|
+ thread = threading.Thread(target = self.execute_status_command, args = (command,))
|
|
|
+ thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
|
|
|
+ thread.start()
|
|
|
+ thread.join(timeout=self.status_command_timeout)
|
|
|
|
|
|
- if not component_name in self.hangingStatusCommands:
|
|
|
- thread = threading.Thread(target = self.execute_status_command, args = (command,))
|
|
|
- thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
|
|
|
- thread.start()
|
|
|
- thread.join(timeout=self.status_command_timeout)
|
|
|
-
|
|
|
- if thread.isAlive():
|
|
|
- # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
|
|
|
- PythonReflectiveExecutor.last_context.revert()
|
|
|
- logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout))
|
|
|
- self.hangingStatusCommands[component_name] = thread
|
|
|
- else:
|
|
|
- logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name))
|
|
|
+ if thread.isAlive():
|
|
|
+ terminate_thread(thread)
|
|
|
+ # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
|
|
|
+ PythonReflectiveExecutor.last_context.revert()
|
|
|
+ logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout))
|
|
|
else:
|
|
|
logger.error("Unrecognized command " + pprint.pformat(command))
|
|
|
except Exception:
|