PythonExecutor.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. #!/usr/bin/env python
  2. '''
  3. Licensed to the Apache Software Foundation (ASF) under one
  4. or more contributor license agreements. See the NOTICE file
  5. distributed with this work for additional information
  6. regarding copyright ownership. The ASF licenses this file
  7. to you under the Apache License, Version 2.0 (the
  8. "License"); you may not use this file except in compliance
  9. with the License. You may obtain a copy of the License at
  10. http://www.apache.org/licenses/LICENSE-2.0
  11. Unless required by applicable law or agreed to in writing, software
  12. distributed under the License is distributed on an "AS IS" BASIS,
  13. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. See the License for the specific language governing permissions and
  15. limitations under the License.
  16. '''
  17. import ambari_simplejson as json
  18. import logging
  19. import os
  20. import subprocess
  21. import pprint
  22. import threading
  23. import platform
  24. from threading import Thread
  25. import time
  26. from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
  27. from ambari_commons.os_check import OSConst, OSCheck
  28. from Grep import Grep
  29. import sys
  30. from ambari_commons import shell
  31. from ambari_commons.shell import shellRunner
  32. logger = logging.getLogger()
  33. class PythonExecutor(object):
  34. """
  35. Performs functionality for executing python scripts.
  36. Warning: class maintains internal state. As a result, instances should not be
  37. used as a singleton for a concurrent execution of python scripts
  38. """
  39. NO_ERROR = "none"
  40. def __init__(self, tmpDir, config):
  41. self.grep = Grep()
  42. self.event = threading.Event()
  43. self.python_process_has_been_killed = False
  44. self.tmpDir = tmpDir
  45. self.config = config
  46. pass
  47. def open_subprocess_files(self, tmpoutfile, tmperrfile, override_output_files, backup_log_files = True):
  48. if override_output_files: # Recreate files, existing files are backed up if backup_log_files is True
  49. if backup_log_files:
  50. self.back_up_log_file_if_exists(tmpoutfile)
  51. self.back_up_log_file_if_exists(tmperrfile)
  52. tmpout = open(tmpoutfile, 'w')
  53. tmperr = open(tmperrfile, 'w')
  54. else: # Append to files
  55. tmpout = open(tmpoutfile, 'a')
  56. tmperr = open(tmperrfile, 'a')
  57. return tmpout, tmperr
  58. def back_up_log_file_if_exists(self, file_path):
  59. if os.path.isfile(file_path):
  60. counter = 0
  61. while True:
  62. # Find backup name that is not used yet (saves logs
  63. # from multiple command retries)
  64. backup_name = file_path + "." + str(counter)
  65. if not os.path.isfile(backup_name):
  66. break
  67. counter += 1
  68. os.rename(file_path, backup_name)
  69. def run_file(self, script, script_params, tmpoutfile, tmperrfile,
  70. timeout, tmpstructedoutfile, callback, task_id,
  71. override_output_files = True, backup_log_files = True, handle = None,
  72. log_info_on_failure = True):
  73. """
  74. Executes the specified python file in a separate subprocess.
  75. Method returns only when the subprocess is finished.
  76. Params arg is a list of script parameters
  77. Timeout meaning: how many seconds should pass before script execution
  78. is forcibly terminated
  79. override_output_files option defines whether stdout/stderr files will be
  80. recreated or appended.
  81. The structured out file, however, is preserved during multiple invocations that use the same file.
  82. """
  83. pythonCommand = self.python_command(script, script_params)
  84. logger.debug("Running command " + pprint.pformat(pythonCommand))
  85. if handle is None:
  86. tmpout, tmperr = self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files, backup_log_files)
  87. process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
  88. # map task_id to pid
  89. callback(task_id, process.pid)
  90. logger.debug("Launching watchdog thread")
  91. self.event.clear()
  92. self.python_process_has_been_killed = False
  93. thread = Thread(target = self.python_watchdog_func, args = (process, timeout))
  94. thread.start()
  95. # Waiting for the process to be either finished or killed
  96. process.communicate()
  97. self.event.set()
  98. thread.join()
  99. result = self.prepare_process_result(process.returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
  100. if log_info_on_failure and result['exitcode']:
  101. self.on_failure(pythonCommand, result)
  102. return result
  103. else:
  104. holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle)
  105. background = BackgroundThread(holder, self)
  106. background.start()
  107. return {"exitcode": 777}
  108. def on_failure(self, pythonCommand, result):
  109. """
  110. Log some useful information after task failure.
  111. """
  112. logger.info("Command " + pprint.pformat(pythonCommand) + " failed with exitcode=" + str(result['exitcode']))
  113. if OSCheck.is_windows_family():
  114. cmd_list = ["WMIC path win32_process get Caption,Processid,Commandline", "netstat -an"]
  115. else:
  116. cmd_list = ["ps faux", "netstat -tulpn"]
  117. shell_runner = shellRunner()
  118. for cmd in cmd_list:
  119. ret = shell_runner.run(cmd)
  120. logger.info("Command '{0}' returned {1}. {2}{3}".format(cmd, ret["exitCode"], ret["error"], ret["output"]))
  121. def prepare_process_result(self, returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
  122. out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
  123. if self.python_process_has_been_killed:
  124. error = str(error) + "\n Python script has been killed due to timeout" + \
  125. (" after waiting %s secs" % str(timeout) if timeout else "")
  126. returncode = 999
  127. result = self.condenseOutput(out, error, returncode, structured_out)
  128. logger.debug("Result: %s" % result)
  129. return result
  130. def read_result_from_files(self, out_path, err_path, structured_out_path):
  131. out = open(out_path, 'r').read()
  132. error = open(err_path, 'r').read()
  133. try:
  134. with open(structured_out_path, 'r') as fp:
  135. structured_out = json.load(fp)
  136. except Exception:
  137. if os.path.exists(structured_out_path):
  138. errMsg = 'Unable to read structured output from ' + structured_out_path
  139. structured_out = {
  140. 'msg' : errMsg
  141. }
  142. logger.warn(structured_out)
  143. else:
  144. structured_out = {}
  145. return out, error, structured_out
  146. def launch_python_subprocess(self, command, tmpout, tmperr):
  147. """
  148. Creates subprocess with given parameters. This functionality was moved to separate method
  149. to make possible unit testing
  150. """
  151. close_fds = None if OSCheck.get_os_family() == OSConst.WINSRV_FAMILY else True
  152. command_env = dict(os.environ)
  153. if OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
  154. command_env["PYTHONPATH"] = os.pathsep.join(sys.path)
  155. for k, v in command_env.iteritems():
  156. command_env[k] = str(v)
  157. return subprocess.Popen(command,
  158. stdout=tmpout,
  159. stderr=tmperr, close_fds=close_fds, env=command_env)
  160. def isSuccessfull(self, returncode):
  161. return not self.python_process_has_been_killed and returncode == 0
  162. def python_command(self, script, script_params):
  163. #we need manually pass python executable on windows because sys.executable will return service wrapper
  164. python_binary = os.environ['PYTHON_EXE'] if 'PYTHON_EXE' in os.environ else sys.executable
  165. python_command = [python_binary, script] + script_params
  166. return python_command
  167. def condenseOutput(self, stdout, stderr, retcode, structured_out):
  168. log_lines_count = self.config.get('heartbeat', 'log_lines_count')
  169. result = {
  170. "exitcode": retcode,
  171. "stdout": self.grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
  172. "stderr": self.grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
  173. "structuredOut" : structured_out
  174. }
  175. return result
  176. def python_watchdog_func(self, python, timeout):
  177. self.event.wait(timeout)
  178. if python.returncode is None:
  179. logger.error("Subprocess timed out and will be killed")
  180. shell.kill_process_with_children(python.pid)
  181. self.python_process_has_been_killed = True
  182. pass
  183. class Holder:
  184. def __init__(self, command, out_file, err_file, structured_out_file, handle):
  185. self.command = command
  186. self.out_file = out_file
  187. self.err_file = err_file
  188. self.structured_out_file = structured_out_file
  189. self.handle = handle
  190. class BackgroundThread(threading.Thread):
  191. def __init__(self, holder, pythonExecutor):
  192. threading.Thread.__init__(self)
  193. self.holder = holder
  194. self.pythonExecutor = pythonExecutor
  195. def run(self):
  196. process_out, process_err = self.pythonExecutor.open_subprocess_files(self.holder.out_file, self.holder.err_file, True)
  197. logger.debug("Starting process command %s" % self.holder.command)
  198. process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err)
  199. logger.debug("Process has been started. Pid = %s" % process.pid)
  200. self.holder.handle.pid = process.pid
  201. self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
  202. self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid)
  203. process.communicate()
  204. self.holder.handle.exitCode = process.returncode
  205. process_condensed_result = self.pythonExecutor.prepare_process_result(process.returncode, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
  206. logger.debug("Calling callback with args %s" % process_condensed_result)
  207. self.holder.handle.on_background_command_complete_callback(process_condensed_result, self.holder.handle)
  208. logger.debug("Exiting from thread for holder pid %s" % self.holder.handle.pid)