|
@@ -31,6 +31,7 @@ import PuppetExecutor
|
|
import PythonExecutor
|
|
import PythonExecutor
|
|
from ActualConfigHandler import ActualConfigHandler
|
|
from ActualConfigHandler import ActualConfigHandler
|
|
from CommandStatusDict import CommandStatusDict
|
|
from CommandStatusDict import CommandStatusDict
|
|
|
|
+from CustomServiceOrchestrator import CustomServiceOrchestrator
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger()
|
|
logger = logging.getLogger()
|
|
@@ -55,6 +56,9 @@ class ActionQueue(threading.Thread):
|
|
COMPLETED_STATUS = 'COMPLETED'
|
|
COMPLETED_STATUS = 'COMPLETED'
|
|
FAILED_STATUS = 'FAILED'
|
|
FAILED_STATUS = 'FAILED'
|
|
|
|
|
|
|
|
+ COMMAND_FORMAT_V1 = "1.0"
|
|
|
|
+ COMMAND_FORMAT_V2 = "2.0"
|
|
|
|
+
|
|
def __init__(self, config, controller):
|
|
def __init__(self, config, controller):
|
|
super(ActionQueue, self).__init__()
|
|
super(ActionQueue, self).__init__()
|
|
self.commandQueue = Queue.Queue()
|
|
self.commandQueue = Queue.Queue()
|
|
@@ -65,6 +69,7 @@ class ActionQueue(threading.Thread):
|
|
self.sh = shellRunner()
|
|
self.sh = shellRunner()
|
|
self._stop = threading.Event()
|
|
self._stop = threading.Event()
|
|
self.tmpdir = config.get('agent', 'prefix')
|
|
self.tmpdir = config.get('agent', 'prefix')
|
|
|
|
+ self.customServiceOrchestrator = CustomServiceOrchestrator(config)
|
|
|
|
|
|
def stop(self):
|
|
def stop(self):
|
|
self._stop.set()
|
|
self._stop.set()
|
|
@@ -103,16 +108,33 @@ class ActionQueue(threading.Thread):
|
|
logger.warn(err)
|
|
logger.warn(err)
|
|
|
|
|
|
|
|
|
|
|
|
+ def determine_command_format_version(self, command):
|
|
|
|
+ """
|
|
|
|
+ Returns either COMMAND_FORMAT_V1 or COMMAND_FORMAT_V2
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ if command['commandParams']['schema_version'] == self.COMMAND_FORMAT_V2:
|
|
|
|
+ return self.COMMAND_FORMAT_V2
|
|
|
|
+ else:
|
|
|
|
+ return self.COMMAND_FORMAT_V1
|
|
|
|
+ except KeyError:
|
|
|
|
+ pass # ignore
|
|
|
|
+ return self.COMMAND_FORMAT_V1 # Fallback
|
|
|
|
+
|
|
|
|
+
|
|
def execute_command(self, command):
|
|
def execute_command(self, command):
|
|
'''
|
|
'''
|
|
Executes commands of type EXECUTION_COMMAND
|
|
Executes commands of type EXECUTION_COMMAND
|
|
'''
|
|
'''
|
|
clusterName = command['clusterName']
|
|
clusterName = command['clusterName']
|
|
commandId = command['commandId']
|
|
commandId = command['commandId']
|
|
|
|
+ command_format = self.determine_command_format_version(command)
|
|
|
|
|
|
- logger.info("Executing command with id = " + str(commandId) +\
|
|
|
|
- " for role = " + command['role'] + " of " +\
|
|
|
|
- "cluster " + clusterName)
|
|
|
|
|
|
+ message = "Executing command with id = {commandId} for role = {role} of " \
|
|
|
|
+ "cluster {cluster}. Command format={command_format}".format(
|
|
|
|
+ commandId = str(commandId), role=command['role'],
|
|
|
|
+ cluster=clusterName, command_format=command_format)
|
|
|
|
+ logger.info(message)
|
|
logger.debug(pprint.pformat(command))
|
|
logger.debug(pprint.pformat(command))
|
|
|
|
|
|
taskId = command['taskId']
|
|
taskId = command['taskId']
|
|
@@ -124,17 +146,19 @@ class ActionQueue(threading.Thread):
|
|
'status': self.IN_PROGRESS_STATUS
|
|
'status': self.IN_PROGRESS_STATUS
|
|
})
|
|
})
|
|
self.commandStatuses.put_command_status(command, in_progress_status)
|
|
self.commandStatuses.put_command_status(command, in_progress_status)
|
|
- # TODO: Add CustomServiceOrchestrator call somewhere here
|
|
|
|
# running command
|
|
# running command
|
|
- # Create a new instance of executor for the current thread
|
|
|
|
- puppetExecutor = PuppetExecutor.PuppetExecutor(
|
|
|
|
- self.config.get('puppet', 'puppetmodules'),
|
|
|
|
- self.config.get('puppet', 'puppet_home'),
|
|
|
|
- self.config.get('puppet', 'facter_home'),
|
|
|
|
- self.config.get('agent', 'prefix'), self.config)
|
|
|
|
- commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
|
|
|
|
- in_progress_status['tmperr'])
|
|
|
|
-
|
|
|
|
|
|
+ if command_format == self.COMMAND_FORMAT_V1:
|
|
|
|
+ # Create a new instance of executor for the current thread
|
|
|
|
+ puppetExecutor = PuppetExecutor.PuppetExecutor(
|
|
|
|
+ self.config.get('puppet', 'puppetmodules'),
|
|
|
|
+ self.config.get('puppet', 'puppet_home'),
|
|
|
|
+ self.config.get('puppet', 'facter_home'),
|
|
|
|
+ self.config.get('agent', 'prefix'), self.config)
|
|
|
|
+ commandresult = puppetExecutor.runCommand(command, in_progress_status['tmpout'],
|
|
|
|
+ in_progress_status['tmperr'])
|
|
|
|
+ else:
|
|
|
|
+ commandresult = self.customServiceOrchestrator.runCommand(command,
|
|
|
|
+ in_progress_status['tmpout'], in_progress_status['tmperr'])
|
|
# dumping results
|
|
# dumping results
|
|
status = self.COMPLETED_STATUS
|
|
status = self.COMPLETED_STATUS
|
|
if commandresult['exitcode'] != 0:
|
|
if commandresult['exitcode'] != 0:
|