PuppetExecutor.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #!/usr/bin/env python2.6
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import json
  18. import os.path
  19. import logging
  20. import subprocess
  21. import pprint
  22. import threading
  23. from threading import Thread
  24. from shell import shellRunner
  25. from manifestGenerator import generateManifest
  26. from RepoInstaller import RepoInstaller
  27. from Grep import Grep
  28. import shell
  29. JAVANOTVALID_MSG = "Cannot access JDK! Make sure you have permission to execute {0}/bin/java"
  30. logger = logging.getLogger()
  31. class PuppetExecutor:
  32. """ Class that executes the commands that come from the server using puppet.
  33. This is the class that provides the pluggable point for executing the puppet"""
  34. # How many seconds will pass before running puppet is terminated on timeout
  35. PUPPET_TIMEOUT_SECONDS = 600
  36. grep = Grep()
  37. event = threading.Event()
  38. last_puppet_has_been_killed = False
  39. NO_ERROR = "none"
  40. def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config):
  41. self.puppetModule = puppetModule
  42. self.puppetInstall = puppetInstall
  43. self.facterInstall = facterInstall
  44. self.tmpDir = tmpDir
  45. self.reposInstalled = False
  46. self.config = config
  47. self.modulesdir = self.puppetModule + "/modules"
  48. self.sh = shellRunner()
  49. def configureEnviron(self, environ):
  50. if not self.config.has_option("puppet", "ruby_home"):
  51. return environ
  52. ruby_home = self.config.get("puppet", "ruby_home")
  53. if os.path.exists(ruby_home):
  54. """Only update ruby home if the config is configured"""
  55. path = os.environ["PATH"]
  56. if not ruby_home in path:
  57. environ["PATH"] = ruby_home + os.path.sep + "bin" + ":"+environ["PATH"]
  58. environ["MY_RUBY_HOME"] = ruby_home
  59. return environ
  60. def getPuppetBinary(self):
  61. puppetbin = os.path.join(self.puppetInstall, "bin", "puppet")
  62. if os.path.exists(puppetbin):
  63. return puppetbin
  64. else:
  65. logger.info("Using default puppet on the host : " + puppetbin
  66. + " does not exist.")
  67. return "puppet"
  68. def discardInstalledRepos(self):
  69. """
  70. Makes agent to forget about installed repos.
  71. So the next call of generate_repo_manifests() will definitely
  72. install repos again
  73. """
  74. self.reposInstalled = False
  75. def generate_repo_manifests(self, command, tmpDir, modulesdir, taskId):
  76. # Hack to only create the repo files once
  77. manifest_list = []
  78. if not self.reposInstalled:
  79. repoInstaller = RepoInstaller(command, tmpDir, modulesdir, taskId, self.config)
  80. manifest_list = repoInstaller.generate_repo_manifests()
  81. return manifest_list
  82. def puppetCommand(self, sitepp):
  83. modules = self.puppetModule
  84. puppetcommand = [self.getPuppetBinary(), "apply", "--confdir=" + modules, "--detailed-exitcodes", sitepp]
  85. return puppetcommand
  86. def facterLib(self):
  87. return self.facterInstall + "/lib/"
  88. pass
  89. def puppetLib(self):
  90. return self.puppetInstall + "/lib"
  91. pass
  92. def condenseOutput(self, stdout, stderr, retcode):
  93. grep = self.grep
  94. if stderr == self.NO_ERROR:
  95. result = grep.tail(stdout, grep.OUTPUT_LAST_LINES)
  96. else:
  97. result = grep.grep(stdout, "fail", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
  98. result = grep.cleanByTemplate(result, "warning")
  99. if result is None: # Second try
  100. result = grep.grep(stdout, "err", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
  101. result = grep.cleanByTemplate(result, "warning")
  102. filteredresult = grep.filterMarkup(result)
  103. return filteredresult
  104. def isSuccessfull(self, returncode):
  105. return not self.last_puppet_has_been_killed and (returncode == 0 or returncode == 2)
  106. def run_manifest(self, command, file, tmpoutfile, tmperrfile):
  107. result = {}
  108. taskId = 0
  109. if command.has_key("taskId"):
  110. taskId = command['taskId']
  111. puppetEnv = os.environ
  112. #Install repos
  113. repo_manifest_list = self.generate_repo_manifests(command, self.tmpDir, self.modulesdir, taskId)
  114. puppetFiles = list(repo_manifest_list)
  115. puppetFiles.append(file)
  116. #Run all puppet commands, from manifest generator and for repos installation
  117. #Appending outputs and errors, exitcode - maximal from all
  118. for puppetFile in puppetFiles:
  119. self.runPuppetFile(puppetFile, result, puppetEnv, tmpoutfile, tmperrfile)
  120. # Check if one of the puppet command fails and error out
  121. if not self.isSuccessfull(result["exitcode"]):
  122. break
  123. if self.isSuccessfull(result["exitcode"]):
  124. # Check if all the repos were installed or not and reset the flag
  125. self.reposInstalled = True
  126. logger.info("ExitCode : " + str(result["exitcode"]))
  127. return result
  128. def isJavaAvailable(self, command):
  129. javaExecutablePath = "{0}/bin/java".format(command)
  130. return not self.sh.run([javaExecutablePath, '-version'])['exitCode']
  131. def runCommand(self, command, tmpoutfile, tmperrfile):
  132. # After installing we must have jdk available for start/stop/smoke
  133. if command['roleCommand'] != "INSTALL":
  134. java64_home = None
  135. if ('global' in command['configurations']) and ('java64_home' in command['configurations']['global']):
  136. java64_home = str(command['configurations']['global']['java64_home']).strip()
  137. if java64_home is None or not self.isJavaAvailable(java64_home):
  138. if java64_home is None:
  139. errMsg = "Cannot access JDK! Make sure java64_home is specified in global config"
  140. else:
  141. errMsg = JAVANOTVALID_MSG.format(java64_home)
  142. return {'stdout': '', 'stderr': errMsg, 'exitcode': 1}
  143. pass
  144. pass
  145. taskId = 0
  146. if command.has_key("taskId"):
  147. taskId = command['taskId']
  148. siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp")
  149. generateManifest(command, siteppFileName, self.modulesdir, self.config)
  150. result = self.run_manifest(command, siteppFileName, tmpoutfile, tmperrfile)
  151. return result
  152. def runPuppetFile(self, puppetFile, result, puppetEnv, tmpoutfile, tmperrfile):
  153. """ Run the command and make sure the output gets propagated"""
  154. puppetcommand = self.puppetCommand(puppetFile)
  155. rubyLib = ""
  156. if os.environ.has_key("RUBYLIB"):
  157. rubyLib = os.environ["RUBYLIB"]
  158. logger.debug("RUBYLIB from Env " + rubyLib)
  159. if not (self.facterLib() in rubyLib):
  160. rubyLib = rubyLib + ":" + self.facterLib()
  161. if not (self.puppetLib() in rubyLib):
  162. rubyLib = rubyLib + ":" + self.puppetLib()
  163. tmpout = open(tmpoutfile, 'w')
  164. tmperr = open(tmperrfile, 'w')
  165. puppetEnv["RUBYLIB"] = rubyLib
  166. puppetEnv = self.configureEnviron(puppetEnv)
  167. logger.debug("Setting RUBYLIB as: " + rubyLib)
  168. logger.info("Running command " + pprint.pformat(puppetcommand))
  169. puppet = self.lauch_puppet_subprocess(puppetcommand, tmpout, tmperr, puppetEnv)
  170. logger.info("Command started with PID: " + str(puppet.pid))
  171. logger.debug("Launching watchdog thread")
  172. self.event.clear()
  173. self.last_puppet_has_been_killed = False
  174. thread = Thread(target = self.puppet_watchdog_func, args = (puppet, ))
  175. thread.start()
  176. # Waiting for process to finished or killed
  177. puppet.communicate()
  178. self.event.set()
  179. thread.join()
  180. # Building results
  181. error = self.NO_ERROR
  182. returncode = 0
  183. if not self.isSuccessfull(puppet.returncode):
  184. returncode = puppet.returncode
  185. error = open(tmperrfile, 'r').read()
  186. logging.error("Error running puppet: \n" + str(error))
  187. pass
  188. if self.last_puppet_has_been_killed:
  189. error = str(error) + "\n Puppet has been killed due to timeout"
  190. returncode = 999
  191. if result.has_key("stderr"):
  192. result["stderr"] = result["stderr"] + os.linesep + str(error)
  193. else:
  194. result["stderr"] = str(error)
  195. puppetOutput = open(tmpoutfile, 'r').read()
  196. logger.debug("Output from puppet :\n" + puppetOutput)
  197. logger.info("Puppet exit code is " + str(returncode))
  198. if result.has_key("exitcode"):
  199. result["exitcode"] = max(returncode, result["exitcode"])
  200. else:
  201. result["exitcode"] = returncode
  202. condensed = self.condenseOutput(puppetOutput, error, returncode)
  203. if result.has_key("stdout"):
  204. result["stdout"] = result["stdout"] + os.linesep + str(condensed)
  205. else:
  206. result["stdout"] = str(condensed)
  207. return result
  208. def lauch_puppet_subprocess(self, puppetcommand, tmpout, tmperr, puppetEnv):
  209. """
  210. Creates subprocess with given parameters. This functionality was moved to separate method
  211. to make possible unit testing
  212. """
  213. return subprocess.Popen(puppetcommand,
  214. stdout=tmpout,
  215. stderr=tmperr,
  216. env=puppetEnv)
  217. def puppet_watchdog_func(self, puppet):
  218. self.event.wait(self.PUPPET_TIMEOUT_SECONDS)
  219. if puppet.returncode is None:
  220. logger.error("Task timed out, killing process with PID: " + str(puppet.pid))
  221. shell.kill_process_with_children(puppet.pid)
  222. self.last_puppet_has_been_killed = True
  223. pass
  224. def main():
  225. logging.basicConfig(level=logging.DEBUG)
  226. #test code
  227. jsonFile = open('test.json', 'r')
  228. jsonStr = jsonFile.read()
  229. # Below is for testing only.
  230. puppetInstance = PuppetExecutor("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
  231. "/usr/",
  232. "/root/workspace/puppet-install/facter-1.6.10/",
  233. "/tmp")
  234. jsonFile = open('test.json', 'r')
  235. jsonStr = jsonFile.read()
  236. parsedJson = json.loads(jsonStr)
  237. result = puppetInstance.runCommand(parsedJson, '/tmp/out.txt', '/tmp/err.txt')
  238. logger.debug(result)
  239. if __name__ == '__main__':
  240. main()