PuppetExecutor.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. #!/usr/bin/env python2.6
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import json
  18. import os.path
  19. import logging
  20. import subprocess
  21. from manifestGenerator import generateManifest
  22. from RepoInstaller import RepoInstaller
  23. import pprint, threading
  24. from Grep import Grep
  25. from threading import Thread
  26. import shell
  27. import traceback
  28. logger = logging.getLogger()
  29. class PuppetExecutor:
  30. """ Class that executes the commands that come from the server using puppet.
  31. This is the class that provides the pluggable point for executing the puppet"""
  32. # How many seconds will pass before running puppet is terminated on timeout
  33. PUPPET_TIMEOUT_SECONDS = 600
  34. grep = Grep()
  35. event = threading.Event()
  36. last_puppet_has_been_killed = False
  37. NO_ERROR = "none"
  38. def __init__(self, puppetModule, puppetInstall, facterInstall, tmpDir, config):
  39. self.puppetModule = puppetModule
  40. self.puppetInstall = puppetInstall
  41. self.facterInstall = facterInstall
  42. self.tmpDir = tmpDir
  43. self.reposInstalled = False
  44. self.config = config
  45. self.modulesdir = self.puppetModule + "/modules"
  46. def configureEnviron(self, environ):
  47. if not self.config.has_option("puppet", "ruby_home"):
  48. return environ
  49. ruby_home = self.config.get("puppet", "ruby_home")
  50. if os.path.exists(ruby_home):
  51. """Only update ruby home if the config is configured"""
  52. path = os.environ["PATH"]
  53. if not ruby_home in path:
  54. environ["PATH"] = ruby_home + os.path.sep + "bin" + ":"+environ["PATH"]
  55. environ["MY_RUBY_HOME"] = ruby_home
  56. return environ
  57. def getPuppetBinary(self):
  58. puppetbin = os.path.join(self.puppetInstall, "bin", "puppet")
  59. if os.path.exists(puppetbin):
  60. return puppetbin
  61. else:
  62. logger.info("Using default puppet on the host : " + puppetbin
  63. + " does not exist.")
  64. return "puppet"
  65. def discardInstalledRepos(self):
  66. """
  67. Makes agent to forget about installed repos.
  68. So the next call of generate_repo_manifests() will definitely
  69. install repos again
  70. """
  71. self.reposInstalled = False
  72. def generate_repo_manifests(self, command, tmpDir, modulesdir, taskId):
  73. # Hack to only create the repo files once
  74. manifest_list = []
  75. if not self.reposInstalled:
  76. repoInstaller = RepoInstaller(command, tmpDir, modulesdir, taskId, self.config)
  77. manifest_list = repoInstaller.generate_repo_manifests()
  78. return manifest_list
  79. def puppetCommand(self, sitepp):
  80. modules = self.puppetModule
  81. puppetcommand = [self.getPuppetBinary(), "apply", "--confdir=" + modules, "--detailed-exitcodes", sitepp]
  82. return puppetcommand
  83. def facterLib(self):
  84. return self.facterInstall + "/lib/"
  85. pass
  86. def puppetLib(self):
  87. return self.puppetInstall + "/lib"
  88. pass
  89. def condenseOutput(self, stdout, stderr, retcode):
  90. grep = self.grep
  91. if stderr == self.NO_ERROR:
  92. result = grep.tail(stdout, grep.OUTPUT_LAST_LINES)
  93. else:
  94. result = grep.grep(stdout, "fail", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
  95. if result is None: # Second try
  96. result = grep.grep(stdout, "err", grep.ERROR_LAST_LINES_BEFORE, grep.ERROR_LAST_LINES_AFTER)
  97. filteredresult = grep.filterMarkup(result)
  98. return filteredresult
  99. def isSuccessfull(self, returncode):
  100. return not self.last_puppet_has_been_killed and (returncode == 0 or returncode == 2)
  101. def run_manifest(self, command, file, tmpoutfile, tmperrfile):
  102. result = {}
  103. taskId = 0
  104. if command.has_key("taskId"):
  105. taskId = command['taskId']
  106. puppetEnv = os.environ
  107. #Install repos
  108. repo_manifest_list = self.generate_repo_manifests(command, self.tmpDir, self.modulesdir, taskId)
  109. puppetFiles = list(repo_manifest_list)
  110. puppetFiles.append(file)
  111. #Run all puppet commands, from manifest generator and for repos installation
  112. #Appending outputs and errors, exitcode - maximal from all
  113. for puppetFile in puppetFiles:
  114. self.runPuppetFile(puppetFile, result, puppetEnv, tmpoutfile, tmperrfile)
  115. # Check if one of the puppet command fails and error out
  116. if not self.isSuccessfull(result["exitcode"]):
  117. break
  118. if self.isSuccessfull(result["exitcode"]):
  119. # Check if all the repos were installed or not and reset the flag
  120. self.reposInstalled = True
  121. logger.info("ExitCode : " + str(result["exitcode"]))
  122. return result
  123. def runCommand(self, command, tmpoutfile, tmperrfile):
  124. taskId = 0
  125. if command.has_key("taskId"):
  126. taskId = command['taskId']
  127. siteppFileName = os.path.join(self.tmpDir, "site-" + str(taskId) + ".pp")
  128. generateManifest(command, siteppFileName, self.modulesdir, self.config)
  129. result = self.run_manifest(command, siteppFileName, tmpoutfile, tmperrfile)
  130. return result
  131. def runPuppetFile(self, puppetFile, result, puppetEnv, tmpoutfile, tmperrfile):
  132. """ Run the command and make sure the output gets propagated"""
  133. puppetcommand = self.puppetCommand(puppetFile)
  134. rubyLib = ""
  135. if os.environ.has_key("RUBYLIB"):
  136. rubyLib = os.environ["RUBYLIB"]
  137. logger.info("RUBYLIB from Env " + rubyLib)
  138. if not (self.facterLib() in rubyLib):
  139. rubyLib = rubyLib + ":" + self.facterLib()
  140. if not (self.puppetLib() in rubyLib):
  141. rubyLib = rubyLib + ":" + self.puppetLib()
  142. tmpout = open(tmpoutfile, 'w')
  143. tmperr = open(tmperrfile, 'w')
  144. puppetEnv["RUBYLIB"] = rubyLib
  145. puppetEnv = self.configureEnviron(puppetEnv)
  146. logger.info("Setting RUBYLIB as: " + rubyLib)
  147. logger.info("Running command " + pprint.pformat(puppetcommand))
  148. puppet = self.lauch_puppet_subprocess(puppetcommand,tmpout, tmperr, puppetEnv)
  149. logger.info("Launching watchdog thread")
  150. self.event.clear()
  151. self.last_puppet_has_been_killed = False
  152. thread = Thread(target = self.puppet_watchdog_func, args = (puppet, ))
  153. thread.start()
  154. # Waiting for process to finished or killed
  155. puppet.communicate()
  156. self.event.set()
  157. thread.join()
  158. # Building results
  159. error = self.NO_ERROR
  160. returncode = 0
  161. if not self.isSuccessfull(puppet.returncode):
  162. returncode = puppet.returncode
  163. error = open(tmperrfile, 'r').read()
  164. logging.error("Error running puppet: \n" + str(error))
  165. pass
  166. if self.last_puppet_has_been_killed:
  167. error = str(error) + "\n Puppet has been killed due to timeout"
  168. returncode = 999
  169. if result.has_key("stderr"):
  170. result["stderr"] = result["stderr"] + os.linesep + str(error)
  171. else:
  172. result["stderr"] = str(error)
  173. puppetOutput = open(tmpoutfile, 'r').read()
  174. logger.info("Output from puppet :\n" + puppetOutput)
  175. logger.info("Puppet exit code is " + str(returncode))
  176. if result.has_key("exitcode"):
  177. result["exitcode"] = max(returncode, result["exitcode"])
  178. else:
  179. result["exitcode"] = returncode
  180. condensed = self.condenseOutput(puppetOutput, error, returncode)
  181. if result.has_key("stdout"):
  182. result["stdout"] = result["stdout"] + os.linesep + str(condensed)
  183. else:
  184. result["stdout"] = str(condensed)
  185. return result
  186. def lauch_puppet_subprocess(self, puppetcommand, tmpout, tmperr, puppetEnv):
  187. """
  188. Creates subprocess with given parameters. This functionality was moved to separate method
  189. to make possible unit testing
  190. """
  191. return subprocess.Popen(puppetcommand,
  192. stdout=tmpout,
  193. stderr=tmperr,
  194. env=puppetEnv)
  195. def puppet_watchdog_func(self, puppet):
  196. self.event.wait(self.PUPPET_TIMEOUT_SECONDS)
  197. if puppet.returncode is None:
  198. logger.error("Task timed out and will be killed")
  199. self.runShellKillPgrp(puppet)
  200. self.last_puppet_has_been_killed = True
  201. pass
  202. def runShellKillPgrp(self, puppet):
  203. shell.killprocessgrp(puppet.pid)
  204. def main():
  205. logging.basicConfig(level=logging.DEBUG)
  206. #test code
  207. jsonFile = open('test.json', 'r')
  208. jsonStr = jsonFile.read()
  209. # Below is for testing only.
  210. puppetInstance = PuppetExecutor("/home/centos/ambari_repo_info/ambari-agent/src/main/puppet/",
  211. "/usr/",
  212. "/root/workspace/puppet-install/facter-1.6.10/",
  213. "/tmp")
  214. jsonFile = open('test.json', 'r')
  215. jsonStr = jsonFile.read()
  216. parsedJson = json.loads(jsonStr)
  217. result = puppetInstance.runCommand(parsedJson, '/tmp/out.txt', '/tmp/err.txt')
  218. logger.debug(result)
  219. if __name__ == '__main__':
  220. main()