|
@@ -33,7 +33,9 @@ import os
|
|
|
import time
|
|
|
import subprocess
|
|
|
import copy
|
|
|
-import puppetExecutor
|
|
|
+import PuppetExecutor
|
|
|
+import UpgradeExecutor
|
|
|
+import PythonExecutor
|
|
|
import tempfile
|
|
|
from Grep import Grep
|
|
|
|
|
@@ -43,26 +45,31 @@ installScriptHash = -1
|
|
|
class ActionQueue(threading.Thread):
|
|
|
""" Action Queue for the agent. We pick one command at a time from the queue
|
|
|
and execute that """
|
|
|
- global commandQueue, resultQueue #, STATUS_COMMAND, EXECUTION_COMMAND
|
|
|
+
|
|
|
commandQueue = Queue.Queue()
|
|
|
resultQueue = Queue.Queue()
|
|
|
|
|
|
STATUS_COMMAND='STATUS_COMMAND'
|
|
|
EXECUTION_COMMAND='EXECUTION_COMMAND'
|
|
|
+ UPGRADE_STATUS='UPGRADE'
|
|
|
+
|
|
|
IDLE_SLEEP_TIME = 5
|
|
|
|
|
|
def __init__(self, config):
|
|
|
super(ActionQueue, self).__init__()
|
|
|
- #threading.Thread.__init__(self)
|
|
|
self.config = config
|
|
|
self.sh = shellRunner()
|
|
|
self._stop = threading.Event()
|
|
|
self.maxRetries = config.getint('command', 'maxretries')
|
|
|
self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
|
|
|
- self.executor = puppetExecutor.puppetExecutor(config.get('puppet', 'puppetmodules'),
|
|
|
+ self.puppetExecutor = PuppetExecutor.PuppetExecutor(
|
|
|
+ config.get('puppet', 'puppetmodules'),
|
|
|
config.get('puppet', 'puppet_home'),
|
|
|
config.get('puppet', 'facter_home'),
|
|
|
config.get('agent', 'prefix'), config)
|
|
|
+ self.pythonExecutor = PythonExecutor.PythonExecutor()
|
|
|
+ self.upgradeExecutor = UpgradeExecutor.UpgradeExecutor(self.pythonExecutor,
|
|
|
+ PuppetExecutor, config)
|
|
|
self.tmpdir = config.get('agent', 'prefix')
|
|
|
self.commandInProgress = None
|
|
|
|
|
@@ -72,24 +79,16 @@ class ActionQueue(threading.Thread):
|
|
|
def stopped(self):
|
|
|
return self._stop.isSet()
|
|
|
|
|
|
- def getshellinstance(self):
|
|
|
- """ For Testing purpose only."""
|
|
|
- return self.sh
|
|
|
-
|
|
|
def put(self, command):
|
|
|
logger.info("The " + command['commandType'] + " from the server is \n" + pprint.pformat(command))
|
|
|
- commandQueue.put(command)
|
|
|
+ self.commandQueue.put(command)
|
|
|
pass
|
|
|
|
|
|
- def getCommandQueue(self):
|
|
|
- """ For Testing purpose only."""
|
|
|
- return commandQueue
|
|
|
-
|
|
|
def run(self):
|
|
|
result = []
|
|
|
while not self.stopped():
|
|
|
- while not commandQueue.empty():
|
|
|
- command = commandQueue.get()
|
|
|
+ while not self.commandQueue.empty():
|
|
|
+ command = self.commandQueue.get()
|
|
|
logger.info("Took an element of Queue: " + pprint.pformat(command))
|
|
|
if command['commandType'] == self.EXECUTION_COMMAND:
|
|
|
try:
|
|
@@ -103,8 +102,8 @@ class ActionQueue(threading.Thread):
|
|
|
pass
|
|
|
|
|
|
for entry in result:
|
|
|
- resultQueue.put((ActionQueue.EXECUTION_COMMAND, entry))
|
|
|
- pass
|
|
|
+ self.resultQueue.put((command['commandType'], entry))
|
|
|
+
|
|
|
elif command['commandType'] == self.STATUS_COMMAND:
|
|
|
cluster = command['clusterName']
|
|
|
service = command['serviceName']
|
|
@@ -116,7 +115,7 @@ class ActionQueue(threading.Thread):
|
|
|
logger.info("Got live status for component " + component + " of service " + str(service) +\
|
|
|
" of cluster " + str(cluster) + "\n" + pprint.pformat(result))
|
|
|
if result is not None:
|
|
|
- resultQueue.put((ActionQueue.STATUS_COMMAND, result))
|
|
|
+ self.resultQueue.put((ActionQueue.STATUS_COMMAND, result))
|
|
|
except Exception, err:
|
|
|
traceback.print_exc()
|
|
|
logger.warn(err)
|
|
@@ -130,9 +129,9 @@ class ActionQueue(threading.Thread):
|
|
|
def result(self):
|
|
|
resultReports = []
|
|
|
resultComponentStatus = []
|
|
|
- while not resultQueue.empty():
|
|
|
- res = resultQueue.get()
|
|
|
- if res[0] == ActionQueue.EXECUTION_COMMAND:
|
|
|
+ while not self.resultQueue.empty():
|
|
|
+ res = self.resultQueue.get()
|
|
|
+ if res[0] == self.EXECUTION_COMMAND:
|
|
|
resultReports.append(res[1])
|
|
|
elif res[0] == ActionQueue.STATUS_COMMAND:
|
|
|
resultComponentStatus.append(res[1])
|
|
@@ -147,7 +146,7 @@ class ActionQueue(threading.Thread):
|
|
|
tmpout='...'
|
|
|
tmperr='...'
|
|
|
grep = Grep()
|
|
|
- output = grep.tail(tmpout, puppetExecutor.puppetExecutor.OUTPUT_LAST_LINES)
|
|
|
+ output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
|
|
|
inprogress = {
|
|
|
'role' : self.commandInProgress['role'],
|
|
|
'actionId' : self.commandInProgress['actionId'],
|
|
@@ -166,12 +165,6 @@ class ActionQueue(threading.Thread):
|
|
|
}
|
|
|
return result
|
|
|
|
|
|
- def registerCommand(self, command):
|
|
|
- return {}
|
|
|
-
|
|
|
- def statusCommand(self, command):
|
|
|
- return {}
|
|
|
-
|
|
|
def executeCommand(self, command):
|
|
|
logger.info("Executing command \n" + pprint.pformat(command))
|
|
|
clusterName = command['clusterName']
|
|
@@ -196,7 +189,11 @@ class ActionQueue(threading.Thread):
|
|
|
'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt'
|
|
|
}
|
|
|
# running command
|
|
|
- commandresult = self.executor.runCommand(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
|
|
|
+ if command['commandType'] == ActionQueue.EXECUTION_COMMAND:
|
|
|
+ if command['roleCommand'] == ActionQueue.UPGRADE_STATUS:
|
|
|
+ commandresult = self.upgradeExecutor.perform_stack_upgrade(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
|
|
|
+ else:
|
|
|
+ commandresult = self.puppetExecutor.runCommand(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
|
|
|
# dumping results
|
|
|
self.commandInProgress = None
|
|
|
status = "COMPLETED"
|
|
@@ -218,17 +215,8 @@ class ActionQueue(threading.Thread):
|
|
|
if roleResult['stderr'] == '':
|
|
|
roleResult['stderr'] = 'None'
|
|
|
result.append(roleResult)
|
|
|
- pass
|
|
|
return result
|
|
|
|
|
|
- def noOpCommand(self, command):
|
|
|
- result = {'commandId' : command['Id']}
|
|
|
- return result
|
|
|
-
|
|
|
- def unknownAction(self, action):
|
|
|
- logger.error('Unknown action: %s' % action['id'])
|
|
|
- result = { 'id': action['id'] }
|
|
|
- return result
|
|
|
|
|
|
def isIdle(self):
|
|
|
- return commandQueue.empty()
|
|
|
+ return self.commandQueue.empty()
|