|
@@ -33,9 +33,7 @@ import os
|
|
|
import time
|
|
|
import subprocess
|
|
|
import copy
|
|
|
-import PuppetExecutor
|
|
|
-import UpgradeExecutor
|
|
|
-import PythonExecutor
|
|
|
+import puppetExecutor
|
|
|
import tempfile
|
|
|
from Grep import Grep
|
|
|
|
|
@@ -45,31 +43,26 @@ installScriptHash = -1
|
|
|
class ActionQueue(threading.Thread):
|
|
|
""" Action Queue for the agent. We pick one command at a time from the queue
|
|
|
and execute that """
|
|
|
-
|
|
|
+ global commandQueue, resultQueue #, STATUS_COMMAND, EXECUTION_COMMAND
|
|
|
commandQueue = Queue.Queue()
|
|
|
resultQueue = Queue.Queue()
|
|
|
|
|
|
STATUS_COMMAND='STATUS_COMMAND'
|
|
|
EXECUTION_COMMAND='EXECUTION_COMMAND'
|
|
|
- UPGRADE_STATUS='UPGRADE'
|
|
|
-
|
|
|
IDLE_SLEEP_TIME = 5
|
|
|
|
|
|
def __init__(self, config):
|
|
|
super(ActionQueue, self).__init__()
|
|
|
+ #threading.Thread.__init__(self)
|
|
|
self.config = config
|
|
|
self.sh = shellRunner()
|
|
|
self._stop = threading.Event()
|
|
|
self.maxRetries = config.getint('command', 'maxretries')
|
|
|
self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
|
|
|
- self.puppetExecutor = PuppetExecutor.PuppetExecutor(
|
|
|
- config.get('puppet', 'puppetmodules'),
|
|
|
+ self.executor = puppetExecutor.puppetExecutor(config.get('puppet', 'puppetmodules'),
|
|
|
config.get('puppet', 'puppet_home'),
|
|
|
config.get('puppet', 'facter_home'),
|
|
|
config.get('agent', 'prefix'), config)
|
|
|
- self.pythonExecutor = PythonExecutor.PythonExecutor()
|
|
|
- self.upgradeExecutor = UpgradeExecutor.UpgradeExecutor(self.pythonExecutor,
|
|
|
- PuppetExecutor, config)
|
|
|
self.tmpdir = config.get('agent', 'prefix')
|
|
|
self.commandInProgress = None
|
|
|
|
|
@@ -79,16 +72,24 @@ class ActionQueue(threading.Thread):
|
|
|
def stopped(self):
|
|
|
return self._stop.isSet()
|
|
|
|
|
|
+ def getshellinstance(self):
|
|
|
+ """ For Testing purpose only."""
|
|
|
+ return self.sh
|
|
|
+
|
|
|
def put(self, command):
|
|
|
logger.info("The " + command['commandType'] + " from the server is \n" + pprint.pformat(command))
|
|
|
- self.commandQueue.put(command)
|
|
|
+ commandQueue.put(command)
|
|
|
pass
|
|
|
|
|
|
+ def getCommandQueue(self):
|
|
|
+ """ For Testing purpose only."""
|
|
|
+ return commandQueue
|
|
|
+
|
|
|
def run(self):
|
|
|
result = []
|
|
|
while not self.stopped():
|
|
|
- while not self.commandQueue.empty():
|
|
|
- command = self.commandQueue.get()
|
|
|
+ while not commandQueue.empty():
|
|
|
+ command = commandQueue.get()
|
|
|
logger.info("Took an element of Queue: " + pprint.pformat(command))
|
|
|
if command['commandType'] == self.EXECUTION_COMMAND:
|
|
|
try:
|
|
@@ -102,8 +103,8 @@ class ActionQueue(threading.Thread):
|
|
|
pass
|
|
|
|
|
|
for entry in result:
|
|
|
- self.resultQueue.put((command['commandType'], entry))
|
|
|
-
|
|
|
+ resultQueue.put((ActionQueue.EXECUTION_COMMAND, entry))
|
|
|
+ pass
|
|
|
elif command['commandType'] == self.STATUS_COMMAND:
|
|
|
cluster = command['clusterName']
|
|
|
service = command['serviceName']
|
|
@@ -115,7 +116,7 @@ class ActionQueue(threading.Thread):
|
|
|
logger.info("Got live status for component " + component + " of service " + str(service) +\
|
|
|
" of cluster " + str(cluster) + "\n" + pprint.pformat(result))
|
|
|
if result is not None:
|
|
|
- self.resultQueue.put((ActionQueue.STATUS_COMMAND, result))
|
|
|
+ resultQueue.put((ActionQueue.STATUS_COMMAND, result))
|
|
|
except Exception, err:
|
|
|
traceback.print_exc()
|
|
|
logger.warn(err)
|
|
@@ -129,9 +130,9 @@ class ActionQueue(threading.Thread):
|
|
|
def result(self):
|
|
|
resultReports = []
|
|
|
resultComponentStatus = []
|
|
|
- while not self.resultQueue.empty():
|
|
|
- res = self.resultQueue.get()
|
|
|
- if res[0] == self.EXECUTION_COMMAND:
|
|
|
+ while not resultQueue.empty():
|
|
|
+ res = resultQueue.get()
|
|
|
+ if res[0] == ActionQueue.EXECUTION_COMMAND:
|
|
|
resultReports.append(res[1])
|
|
|
elif res[0] == ActionQueue.STATUS_COMMAND:
|
|
|
resultComponentStatus.append(res[1])
|
|
@@ -146,7 +147,7 @@ class ActionQueue(threading.Thread):
|
|
|
tmpout='...'
|
|
|
tmperr='...'
|
|
|
grep = Grep()
|
|
|
- output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
|
|
|
+ output = grep.tail(tmpout, puppetExecutor.puppetExecutor.OUTPUT_LAST_LINES)
|
|
|
inprogress = {
|
|
|
'role' : self.commandInProgress['role'],
|
|
|
'actionId' : self.commandInProgress['actionId'],
|
|
@@ -165,6 +166,12 @@ class ActionQueue(threading.Thread):
|
|
|
}
|
|
|
return result
|
|
|
|
|
|
+ def registerCommand(self, command):
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def statusCommand(self, command):
|
|
|
+ return {}
|
|
|
+
|
|
|
def executeCommand(self, command):
|
|
|
logger.info("Executing command \n" + pprint.pformat(command))
|
|
|
clusterName = command['clusterName']
|
|
@@ -189,11 +196,7 @@ class ActionQueue(threading.Thread):
|
|
|
'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt'
|
|
|
}
|
|
|
# running command
|
|
|
- if command['commandType'] == ActionQueue.EXECUTION_COMMAND:
|
|
|
- if command['roleCommand'] == ActionQueue.UPGRADE_STATUS:
|
|
|
- commandresult = self.upgradeExecutor.perform_stack_upgrade(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
|
|
|
- else:
|
|
|
- commandresult = self.puppetExecutor.runCommand(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
|
|
|
+ commandresult = self.executor.runCommand(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
|
|
|
# dumping results
|
|
|
self.commandInProgress = None
|
|
|
status = "COMPLETED"
|
|
@@ -215,8 +218,17 @@ class ActionQueue(threading.Thread):
|
|
|
if roleResult['stderr'] == '':
|
|
|
roleResult['stderr'] = 'None'
|
|
|
result.append(roleResult)
|
|
|
+ pass
|
|
|
return result
|
|
|
|
|
|
+ def noOpCommand(self, command):
|
|
|
+ result = {'commandId' : command['Id']}
|
|
|
+ return result
|
|
|
+
|
|
|
+ def unknownAction(self, action):
|
|
|
+ logger.error('Unknown action: %s' % action['id'])
|
|
|
+ result = { 'id': action['id'] }
|
|
|
+ return result
|
|
|
|
|
|
def isIdle(self):
|
|
|
- return self.commandQueue.empty()
|
|
|
+ return commandQueue.empty()
|