PuppetExecutor.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. #!/usr/bin/env python2.6
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import json
  18. import os.path
  19. import logging
  20. import subprocess
  21. import pprint
  22. import threading
  23. from threading import Thread
  24. from shell import shellRunner
  25. from manifestGenerator import generateManifest
  26. from RepoInstaller import RepoInstaller
  27. from Grep import Grep
  28. import shell
  29. JAVANOTVALID_MSG = "Cannot access JDK! Make sure you have permission to execute {0}/bin/java"
  30. logger = logging.getLogger()
  31. class PuppetExecutor:
  32. """ Class that executes the commands that come from the server using puppet.
  33. This is the class that provides the pluggable point for executing the puppet"""
  34. grep = Grep()
  35. NO_ERROR = "none"
  36. def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config):
  37. self.puppetModule = puppetModule
  38. self.puppetInstall = puppetInstall
  39. self.facterInstall = facterInstall
  40. self.tmpDir = tmpDir
  41. self.reposInstalled = False
  42. self.config = config
  43. self.modulesdir = self.puppetModule + "/modules"
  44. self.event = threading.Event()
  45. self.last_puppet_has_been_killed = False
  46. self.sh = shellRunner()
  47. self.puppet_timeout = config.get("puppet", "timeout_seconds")
  48. def configureEnviron(self, environ):
  49. if not self.config.has_option("puppet", "ruby_home"):
  50. return environ
  51. ruby_home = self.config.get("puppet", "ruby_home")
  52. if os.path.exists(ruby_home):
  53. """Only update ruby home if the config is configured"""
  54. path = os.environ["PATH"]
  55. if not ruby_home in path:
  56. environ["PATH"] = ruby_home + os.path.sep + "bin" + ":"+environ["PATH"]
  57. environ["MY_RUBY_HOME"] = ruby_home
  58. return environ
  59. def getPuppetBinary(self):
  60. puppetbin = os.path.join(self.puppetInstall, "bin", "puppet")
  61. if os.path.exists(puppetbin):
  62. return puppetbin
  63. else:
  64. logger.info("Using default puppet on the host : " + puppetbin
  65. + " does not exist.")
  66. return "puppet"
  67. def discardInstalledRepos(self):
  68. """
  69. Makes agent to forget about installed repos.
  70. So the next call of generate_repo_manifests() will definitely
  71. install repos again
  72. """
  73. self.reposInstalled = False
  74. def generate_repo_manifests(self, command, tmpDir, modulesdir, taskId):
  75. # Hack to only create the repo files once
  76. manifest_list = []
  77. if not self.reposInstalled:
  78. repoInstaller = RepoInstaller(command, tmpDir, modulesdir, taskId, self.config)
  79. manifest_list = repoInstaller.generate_repo_manifests()
  80. return manifest_list
  81. def puppetCommand(self, sitepp):
  82. modules = self.puppetModule
  83. puppetcommand = [self.getPuppetBinary(), "apply", "--confdir=" + modules, "--detailed-exitcodes", sitepp]
  84. return puppetcommand
  85. def facterLib(self):
  86. return self.facterInstall + "/lib/"
  87. pass
  88. def puppetLib(self):
  89. return self.puppetInstall + "/lib"
  90. pass
  91. def condenseOutput(self, stdout, stderr, retcode):
  92. grep = self.grep
  93. if stderr == self.NO_ERROR:
  94. result = grep.tail(stdout, grep.OUTPUT_LAST_LINES)
  95. else:
  96. result = grep.grep(stdout, "fail", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
  97. result = grep.cleanByTemplate(result, "warning")
  98. if result is None: # Second try
  99. result = grep.grep(stdout, "err", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
  100. result = grep.cleanByTemplate(result, "warning")
  101. filteredresult = grep.filterMarkup(result)
  102. return filteredresult
  103. def isSuccessfull(self, returncode):
  104. return not self.last_puppet_has_been_killed and (returncode == 0 or returncode == 2)
  105. def run_manifest(self, command, file, tmpoutfile, tmperrfile):
  106. result = {}
  107. taskId = 0
  108. if command.has_key("taskId"):
  109. taskId = command['taskId']
  110. puppetEnv = os.environ
  111. #Install repos
  112. repo_manifest_list = self.generate_repo_manifests(command, self.tmpDir, self.modulesdir, taskId)
  113. puppetFiles = list(repo_manifest_list)
  114. puppetFiles.append(file)
  115. #Run all puppet commands, from manifest generator and for repos installation
  116. #Appending outputs and errors, exitcode - maximal from all
  117. for puppetFile in puppetFiles:
  118. self.runPuppetFile(puppetFile, result, puppetEnv, tmpoutfile, tmperrfile)
  119. # Check if one of the puppet command fails and error out
  120. if not self.isSuccessfull(result["exitcode"]):
  121. break
  122. if self.isSuccessfull(result["exitcode"]):
  123. # Check if all the repos were installed or not and reset the flag
  124. self.reposInstalled = True
  125. logger.info("ExitCode : " + str(result["exitcode"]))
  126. return result
  127. def isJavaAvailable(self, command):
  128. javaExecutablePath = "{0}/bin/java".format(command)
  129. return not self.sh.run([javaExecutablePath, '-version'])['exitCode']
  130. def runCommand(self, command, tmpoutfile, tmperrfile):
  131. # After installing we must have jdk available for start/stop/smoke
  132. if command['roleCommand'] != "INSTALL":
  133. java64_home = None
  134. if ('global' in command['configurations']) and ('java64_home' in command['configurations']['global']):
  135. java64_home = str(command['configurations']['global']['java64_home']).strip()
  136. if java64_home is None or not self.isJavaAvailable(java64_home):
  137. if java64_home is None:
  138. errMsg = "Cannot access JDK! Make sure java64_home is specified in global config"
  139. else:
  140. errMsg = JAVANOTVALID_MSG.format(java64_home)
  141. return {'stdout': '', 'stderr': errMsg, 'exitcode': 1}
  142. pass
  143. pass
  144. taskId = 0
  145. if command.has_key("taskId"):
  146. taskId = command['taskId']
  147. siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp")
  148. generateManifest(command, siteppFileName, self.modulesdir, self.config)
  149. result = self.run_manifest(command, siteppFileName, tmpoutfile, tmperrfile)
  150. return result
  151. def runPuppetFile(self, puppetFile, result, puppetEnv, tmpoutfile, tmperrfile):
  152. """ Run the command and make sure the output gets propagated"""
  153. puppetcommand = self.puppetCommand(puppetFile)
  154. rubyLib = ""
  155. if os.environ.has_key("RUBYLIB"):
  156. rubyLib = os.environ["RUBYLIB"]
  157. logger.debug("RUBYLIB from Env " + rubyLib)
  158. if not (self.facterLib() in rubyLib):
  159. rubyLib = rubyLib + ":" + self.facterLib()
  160. if not (self.puppetLib() in rubyLib):
  161. rubyLib = rubyLib + ":" + self.puppetLib()
  162. tmpout = open(tmpoutfile, 'w')
  163. tmperr = open(tmperrfile, 'w')
  164. puppetEnv["RUBYLIB"] = rubyLib
  165. puppetEnv = self.configureEnviron(puppetEnv)
  166. logger.debug("Setting RUBYLIB as: " + rubyLib)
  167. logger.info("Running command " + pprint.pformat(puppetcommand))
  168. puppet = self.lauch_puppet_subprocess(puppetcommand, tmpout, tmperr, puppetEnv)
  169. logger.info("Command started with PID: " + str(puppet.pid))
  170. logger.debug("Launching watchdog thread")
  171. self.event.clear()
  172. self.last_puppet_has_been_killed = False
  173. thread = Thread(target = self.puppet_watchdog_func, args = (puppet, ))
  174. thread.start()
  175. # Waiting for process to finished or killed
  176. puppet.communicate()
  177. self.event.set()
  178. thread.join()
  179. # Building results
  180. error = self.NO_ERROR
  181. returncode = 0
  182. if not self.isSuccessfull(puppet.returncode):
  183. returncode = puppet.returncode
  184. error = open(tmperrfile, 'r').read()
  185. logging.error("Error running puppet: \n" + str(error))
  186. pass
  187. if self.last_puppet_has_been_killed:
  188. error = str(error) + "\n Puppet has been killed due to timeout"
  189. returncode = 999
  190. if result.has_key("stderr"):
  191. result["stderr"] = result["stderr"] + os.linesep + str(error)
  192. else:
  193. result["stderr"] = str(error)
  194. puppetOutput = open(tmpoutfile, 'r').read()
  195. logger.debug("Output from puppet :\n" + puppetOutput)
  196. logger.info("Puppet execution process with pid %s exited with code %s." %
  197. (str(puppet.pid), str(returncode)))
  198. if result.has_key("exitcode"):
  199. result["exitcode"] = max(returncode, result["exitcode"])
  200. else:
  201. result["exitcode"] = returncode
  202. condensed = self.condenseOutput(puppetOutput, error, returncode)
  203. if result.has_key("stdout"):
  204. result["stdout"] = result["stdout"] + os.linesep + str(condensed)
  205. else:
  206. result["stdout"] = str(condensed)
  207. return result
  208. def lauch_puppet_subprocess(self, puppetcommand, tmpout, tmperr, puppetEnv):
  209. """
  210. Creates subprocess with given parameters. This functionality was moved to separate method
  211. to make possible unit testing
  212. """
  213. return subprocess.Popen(puppetcommand,
  214. stdout=tmpout,
  215. stderr=tmperr,
  216. env=puppetEnv)
  217. def puppet_watchdog_func(self, puppet):
  218. self.event.wait(float(self.puppet_timeout))
  219. if puppet.returncode is None:
  220. logger.error("Task timed out, killing process with PID: " + str(puppet.pid))
  221. shell.kill_process_with_children(puppet.pid)
  222. self.last_puppet_has_been_killed = True
  223. pass
  224. def main():
  225. logging.basicConfig(level=logging.DEBUG)
  226. #test code
  227. jsonFile = open('test.json', 'r')
  228. jsonStr = jsonFile.read()
  229. # Below is for testing only.
  230. puppetInstance = PuppetExecutor("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
  231. "/usr/",
  232. "/root/workspace/puppet-install/facter-1.6.10/",
  233. "/tmp")
  234. jsonFile = open('test.json', 'r')
  235. jsonStr = jsonFile.read()
  236. parsedJson = json.loads(jsonStr)
  237. result = puppetInstance.runCommand(parsedJson, '/tmp/out.txt', '/tmp/err.txt')
  238. logger.debug(result)
  239. if __name__ == '__main__':
  240. main()