123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import json
- import logging
- import os
- import subprocess
- import pprint
- import threading
- import platform
- from threading import Thread
- import time
- from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
- from ambari_commons.os_check import OSConst, OSCheck
- from Grep import Grep
- import sys
- from ambari_commons import shell
- from ambari_commons.shell import shellRunner
- logger = logging.getLogger()
- class PythonExecutor:
- """
- Performs functionality for executing python scripts.
- Warning: class maintains internal state. As a result, instances should not be
- used as a singleton for a concurrent execution of python scripts
- """
- NO_ERROR = "none"
- grep = Grep()
- event = threading.Event()
- python_process_has_been_killed = False
- def __init__(self, tmpDir, config):
- self.tmpDir = tmpDir
- self.config = config
- pass
- def open_subprocess_files(self, tmpoutfile, tmperrfile, override_output_files):
- if override_output_files: # Recreate files
- tmpout = open(tmpoutfile, 'w')
- tmperr = open(tmperrfile, 'w')
- else: # Append to files
- tmpout = open(tmpoutfile, 'a')
- tmperr = open(tmperrfile, 'a')
- return tmpout, tmperr
- def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
- timeout, tmpstructedoutfile, logger_level, callback, task_id,
- override_output_files = True, handle = None, log_info_on_failure=True):
- """
- Executes the specified python file in a separate subprocess.
- Method returns only when the subprocess is finished.
- Params arg is a list of script parameters
- Timeout meaning: how many seconds should pass before script execution
- is forcibly terminated
- override_output_files option defines whether stdout/stderr files will be
- recreated or appended.
- The structured out file, however, is preserved during multiple invocations that use the same file.
- """
- script_params += [tmpstructedoutfile, logger_level, tmp_dir]
- pythonCommand = self.python_command(script, script_params)
- logger.debug("Running command " + pprint.pformat(pythonCommand))
- if handle is None:
- tmpout, tmperr = self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files)
- process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
- # map task_id to pid
- callback(task_id, process.pid)
- logger.debug("Launching watchdog thread")
- self.event.clear()
- self.python_process_has_been_killed = False
- thread = Thread(target = self.python_watchdog_func, args = (process, timeout))
- thread.start()
- # Waiting for the process to be either finished or killed
- process.communicate()
- self.event.set()
- thread.join()
- result = self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
-
- if log_info_on_failure and result['exitcode']:
- self.on_failure(pythonCommand, result)
-
- return result
- else:
- holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle)
- background = BackgroundThread(holder, self)
- background.start()
- return {"exitcode": 777}
-
- def on_failure(self, pythonCommand, result):
- """
- Log some useful information after task failure.
- """
- logger.info("Command " + pprint.pformat(pythonCommand) + " failed with exitcode=" + str(result['exitcode']))
- cmd_list = ["ps faux", "netstat -tulpn"]
-
- shell_runner = shellRunner()
-
- for cmd in cmd_list:
- ret = shell_runner.run(cmd)
- logger.info("Command '{0}' returned {1}. {2}{3}".format(cmd, ret["exitCode"], ret["error"], ret["output"]))
-
- def prepare_process_result(self, process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
- out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
- # Building results
- returncode = process.returncode
- if self.python_process_has_been_killed:
- error = str(error) + "\n Python script has been killed due to timeout" + \
- (" after waiting %s secs" % str(timeout) if timeout else "")
- returncode = 999
- result = self.condenseOutput(out, error, returncode, structured_out)
- logger.debug("Result: %s" % result)
- return result
- def read_result_from_files(self, out_path, err_path, structured_out_path):
- out = open(out_path, 'r').read()
- error = open(err_path, 'r').read()
- try:
- with open(structured_out_path, 'r') as fp:
- structured_out = json.load(fp)
- except Exception:
- if os.path.exists(structured_out_path):
- errMsg = 'Unable to read structured output from ' + structured_out_path
- structured_out = {
- 'msg' : errMsg
- }
- logger.warn(structured_out)
- else:
- structured_out = {}
- return out, error, structured_out
- def launch_python_subprocess(self, command, tmpout, tmperr):
- """
- Creates subprocess with given parameters. This functionality was moved to separate method
- to make possible unit testing
- """
- close_fds = None if OSCheck.get_os_family() == OSConst.WINSRV_FAMILY else True
- if OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
- command_env = dict(os.environ)
- command_env["PYTHONPATH"] = os.pathsep.join(sys.path)
- for k, v in command_env.iteritems():
- command_env[k] = str(v)
- else:
- command_env = None
- return subprocess.Popen(command,
- stdout=tmpout,
- stderr=tmperr, close_fds=close_fds, env=command_env)
- def isSuccessfull(self, returncode):
- return not self.python_process_has_been_killed and returncode == 0
- def python_command(self, script, script_params):
- #we need manually pass python executable on windows because sys.executable will return service wrapper
- python_binary = os.environ['PYTHON_EXE'] if 'PYTHON_EXE' in os.environ else sys.executable
- python_command = [python_binary, script] + script_params
- return python_command
- def condenseOutput(self, stdout, stderr, retcode, structured_out):
- log_lines_count = self.config.get('heartbeat', 'log_lines_count')
-
- grep = self.grep
- result = {
- "exitcode": retcode,
- "stdout": grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
- "stderr": grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
- "structuredOut" : structured_out
- }
-
- return result
- def python_watchdog_func(self, python, timeout):
- self.event.wait(timeout)
- if python.returncode is None:
- logger.error("Subprocess timed out and will be killed")
- shell.kill_process_with_children(python.pid)
- self.python_process_has_been_killed = True
- pass
- class Holder:
- def __init__(self, command, out_file, err_file, structured_out_file, handle):
- self.command = command
- self.out_file = out_file
- self.err_file = err_file
- self.structured_out_file = structured_out_file
- self.handle = handle
- class BackgroundThread(threading.Thread):
- def __init__(self, holder, pythonExecutor):
- threading.Thread.__init__(self)
- self.holder = holder
- self.pythonExecutor = pythonExecutor
- def run(self):
- process_out, process_err = self.pythonExecutor.open_subprocess_files(self.holder.out_file, self.holder.err_file, True)
- logger.debug("Starting process command %s" % self.holder.command)
- process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err)
- logger.debug("Process has been started. Pid = %s" % process.pid)
- self.holder.handle.pid = process.pid
- self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
- self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid)
- process.communicate()
- self.holder.handle.exitCode = process.returncode
- process_condensed_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
- logger.debug("Calling callback with args %s" % process_condensed_result)
- self.holder.handle.on_background_command_complete_callback(process_condensed_result, self.holder.handle)
- logger.debug("Exiting from thread for holder pid %s" % self.holder.handle.pid)
|