ActionQueue.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. #!/usr/bin/env python2.6
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import logging
  18. import traceback
  19. import logging.handlers
  20. import Queue
  21. import threading
  22. import AmbariConfig
  23. from shell import shellRunner
  24. from FileUtil import writeFile, createStructure, deleteStructure, getFilePath, appendToFile
  25. from shell import shellRunner
  26. import json
  27. import pprint
  28. import os
  29. import time
  30. import subprocess
  31. import copy
  32. import puppetExecutor
  33. logger = logging.getLogger()
  34. installScriptHash = -1
  35. class ActionQueue(threading.Thread):
  36. """ Action Queue for the agent. We pick one command at a time from the queue
  37. and execute that """
  38. global commandQueue, resultQueue
  39. commandQueue = Queue.Queue()
  40. resultQueue = Queue.Queue()
  41. def __init__(self, config):
  42. super(ActionQueue, self).__init__()
  43. #threading.Thread.__init__(self)
  44. self.config = config
  45. self.sh = shellRunner()
  46. self._stop = threading.Event()
  47. self.maxRetries = config.getint('command', 'maxretries')
  48. self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
  49. self.executor = puppetExecutor.puppetExecutor(config.get('puppet', 'puppetmodules'),
  50. config.get('puppet', 'puppet_home'),
  51. config.get('puppet', 'facter_home'),
  52. config.get('agent', 'prefix'))
  53. def stop(self):
  54. self._stop.set()
  55. def stopped(self):
  56. return self._stop.isSet()
  57. def getshellinstance(self):
  58. """ For Testing purpose only."""
  59. return self.sh
  60. def put(self, command):
  61. logger.info("The command from the server is \n" + pprint.pformat(command))
  62. commandQueue.put(command)
  63. pass
  64. def run(self):
  65. result = []
  66. while not self.stopped():
  67. while not commandQueue.empty():
  68. command = commandQueue.get()
  69. try:
  70. #pass a copy of action since we don't want anything to change in the
  71. #action dict
  72. commandCopy = copy.copy(command)
  73. result = self.executeCommand(commandCopy)
  74. except Exception, err:
  75. traceback.print_exc()
  76. logger.warn(err)
  77. pass
  78. for entry in result:
  79. resultQueue.put(entry)
  80. pass
  81. if not self.stopped():
  82. time.sleep(5)
  83. # Store action result to agent response queue
  84. def result(self):
  85. result = []
  86. while not resultQueue.empty():
  87. result.append(resultQueue.get())
  88. return result
  89. def registerCommand(self, command):
  90. return {}
  91. def statusCommand(self, command):
  92. return {}
  93. def executeCommand(self, command):
  94. logger.info("Executing command \n" + pprint.pformat(command))
  95. clusterName = command['clusterName']
  96. commandId = command['commandId']
  97. hostname = command['hostname']
  98. params = command['hostLevelParams']
  99. clusterHostInfo = command['clusterHostInfo']
  100. roleCommand = command['roleCommand']
  101. serviceName = command['serviceName']
  102. configurations = command['configurations']
  103. result = []
  104. commandresult = self.executor.runCommand(command)
  105. status = "COMPLETED"
  106. if (commandresult['exitcode'] != 0):
  107. status = "FAILED"
  108. # assume some puppet pluing to run these commands
  109. roleResult = {'role' : command['role'],
  110. 'actionId' : commandId,
  111. 'taskId' : command['taskId'],
  112. 'stdout' : commandresult['stdout'],
  113. 'clusterName' : clusterName,
  114. 'stderr' : commandresult['stderr'],
  115. 'exitCode' : commandresult['exitcode'],
  116. 'serviceName' : serviceName,
  117. 'status' : status}
  118. result.append(roleResult)
  119. pass
  120. return result
  121. def noOpCommand(self, command):
  122. result = {'commandId' : command['Id']}
  123. return result
  124. def unknownAction(self, action):
  125. logger.error('Unknown action: %s' % action['id'])
  126. result = { 'id': action['id'] }
  127. return result
  128. def isIdle(self):
  129. return commandQueue.empty()