ソースを参照

AMBARI-11242. Every minute Ambari agent freezes for 10-15 while executing status commands (aonishuk)

Andrew Onishuk 10 年 前
コミット
21fde67653

+ 11 - 6
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -28,6 +28,7 @@ import threading
 from FileCache import FileCache
 from FileCache import FileCache
 from AgentException import AgentException
 from AgentException import AgentException
 from PythonExecutor import PythonExecutor
 from PythonExecutor import PythonExecutor
+from PythonReflectiveExecutor import PythonReflectiveExecutor
 import hostname
 import hostname
 
 
 
 
@@ -55,6 +56,7 @@ class CustomServiceOrchestrator():
 
 
   AMBARI_SERVER_HOST = "ambari_server_host"
   AMBARI_SERVER_HOST = "ambari_server_host"
   DONT_DEBUG_FAILURES_FOR_COMMANDS = [COMMAND_NAME_SECURITY_STATUS, COMMAND_NAME_STATUS]
   DONT_DEBUG_FAILURES_FOR_COMMANDS = [COMMAND_NAME_SECURITY_STATUS, COMMAND_NAME_STATUS]
+  REFLECTIVELY_RUN_COMMANDS = [COMMAND_NAME_SECURITY_STATUS, COMMAND_NAME_STATUS] # -- commands which run a lot and often (this increases their speed)
 
 
   def __init__(self, config, controller):
   def __init__(self, config, controller):
     self.config = config
     self.config = config
@@ -95,12 +97,15 @@ class CustomServiceOrchestrator():
       else: 
       else: 
         logger.warn("Unable to find pid by taskId = %s" % task_id)
         logger.warn("Unable to find pid by taskId = %s" % task_id)
 
 
-  def get_py_executor(self):
+  def get_py_executor(self, forced_command_name):
     """
     """
     Wrapper for unit testing
     Wrapper for unit testing
     :return:
     :return:
     """
     """
-    return PythonExecutor(self.tmp_dir, self.config)
+    if forced_command_name in self.REFLECTIVELY_RUN_COMMANDS:
+      return PythonReflectiveExecutor(self.tmp_dir, self.config)
+    else:
+      return PythonExecutor(self.tmp_dir, self.config)
 
 
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
                  override_output_files=True, retry=False):
                  override_output_files=True, retry=False):
@@ -178,13 +183,13 @@ class CustomServiceOrchestrator():
       if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
       if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
         raise AgentException("Background commands are supported without hooks only")
         raise AgentException("Background commands are supported without hooks only")
 
 
-      python_executor = self.get_py_executor()
+      python_executor = self.get_py_executor(forced_command_name)
       for py_file, current_base_dir in filtered_py_file_list:
       for py_file, current_base_dir in filtered_py_file_list:
         log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
         log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
-        script_params = [command_name, json_path, current_base_dir]
+        script_params = [command_name, json_path, current_base_dir, tmpstrucoutfile, logger_level, self.exec_tmp_dir]
         ret = python_executor.run_file(py_file, script_params,
         ret = python_executor.run_file(py_file, script_params,
-                               self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
-                               tmpstrucoutfile, logger_level, self.map_task_to_process,
+                               tmpoutfile, tmperrfile, timeout,
+                               tmpstrucoutfile, self.map_task_to_process,
                                task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure)
                                task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure)
         # Next run_file() invocations should always append to current output
         # Next run_file() invocations should always append to current output
         override_output_files = False
         override_output_files = False

+ 7 - 9
ambari-agent/src/main/python/ambari_agent/PythonExecutor.py

@@ -36,7 +36,7 @@ from ambari_commons.shell import shellRunner
 
 
 logger = logging.getLogger()
 logger = logging.getLogger()
 
 
-class PythonExecutor:
+class PythonExecutor(object):
   """
   """
   Performs functionality for executing python scripts.
   Performs functionality for executing python scripts.
   Warning: class maintains internal state. As a result, instances should not be
   Warning: class maintains internal state. As a result, instances should not be
@@ -62,8 +62,8 @@ class PythonExecutor:
       tmperr =  open(tmperrfile, 'a')
       tmperr =  open(tmperrfile, 'a')
     return tmpout, tmperr
     return tmpout, tmperr
 
 
-  def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
-               timeout, tmpstructedoutfile, logger_level, callback, task_id,
+  def run_file(self, script, script_params, tmpoutfile, tmperrfile,
+               timeout, tmpstructedoutfile, callback, task_id,
                override_output_files = True, handle = None, log_info_on_failure=True):
                override_output_files = True, handle = None, log_info_on_failure=True):
     """
     """
     Executes the specified python file in a separate subprocess.
     Executes the specified python file in a separate subprocess.
@@ -76,9 +76,9 @@ class PythonExecutor:
     The structured out file, however, is preserved during multiple invocations that use the same file.
     The structured out file, however, is preserved during multiple invocations that use the same file.
     """
     """
 
 
-    script_params += [tmpstructedoutfile, logger_level, tmp_dir]
     pythonCommand = self.python_command(script, script_params)
     pythonCommand = self.python_command(script, script_params)
     logger.debug("Running command " + pprint.pformat(pythonCommand))
     logger.debug("Running command " + pprint.pformat(pythonCommand))
+    
     if handle is None:
     if handle is None:
       tmpout, tmperr = self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files)
       tmpout, tmperr = self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files)
 
 
@@ -94,7 +94,7 @@ class PythonExecutor:
       process.communicate()
       process.communicate()
       self.event.set()
       self.event.set()
       thread.join()
       thread.join()
-      result = self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
+      result = self.prepare_process_result(process.returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
       
       
       if log_info_on_failure and result['exitcode']:
       if log_info_on_failure and result['exitcode']:
         self.on_failure(pythonCommand, result)
         self.on_failure(pythonCommand, result)
@@ -123,10 +123,8 @@ class PythonExecutor:
       ret = shell_runner.run(cmd)
       ret = shell_runner.run(cmd)
       logger.info("Command '{0}' returned {1}. {2}{3}".format(cmd, ret["exitCode"], ret["error"], ret["output"]))
       logger.info("Command '{0}' returned {1}. {2}{3}".format(cmd, ret["exitCode"], ret["error"], ret["output"]))
     
     
-  def prepare_process_result(self, process, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
+  def prepare_process_result(self, returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
     out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
     out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
-    # Building results
-    returncode = process.returncode
 
 
     if self.python_process_has_been_killed:
     if self.python_process_has_been_killed:
       error = str(error) + "\n Python script has been killed due to timeout" + \
       error = str(error) + "\n Python script has been killed due to timeout" + \
@@ -227,7 +225,7 @@ class BackgroundThread(threading.Thread):
     process.communicate()
     process.communicate()
 
 
     self.holder.handle.exitCode = process.returncode
     self.holder.handle.exitCode = process.returncode
-    process_condensed_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
+    process_condensed_result = self.pythonExecutor.prepare_process_result(process.returncode, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
     logger.debug("Calling callback with args %s" % process_condensed_result)
     logger.debug("Calling callback with args %s" % process_condensed_result)
     self.holder.handle.on_background_command_complete_callback(process_condensed_result, self.holder.handle)
     self.holder.handle.on_background_command_complete_callback(process_condensed_result, self.holder.handle)
     logger.debug("Exiting from thread for holder pid %s" % self.holder.handle.pid)
     logger.debug("Exiting from thread for holder pid %s" % self.holder.handle.pid)

+ 98 - 0
ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py

@@ -0,0 +1,98 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from PythonExecutor import PythonExecutor
+
+import imp
+import sys
+import os
+import pprint
+import logging
+import copy
+
+logger = logging.getLogger()
+
+class PythonReflectiveExecutor(PythonExecutor):
+  """
+  Some commands like STATUS, SECURITY_STATUS commands are run a lot, and need to be run really fast.
+  Otherwise agent will hang waiting for them to complete every X seconds.
+  
+  Running the commands not in new proccess, but reflectively makes this really fast.
+  """
+  
+  def __init__(self, tmpDir, config):
+    super(PythonReflectiveExecutor, self).__init__(tmpDir, config)
+    
+  def run_file(self, script, script_params, tmpoutfile, tmperrfile,
+               timeout, tmpstructedoutfile, callback, task_id,
+               override_output_files = True, handle = None, log_info_on_failure=True):   
+    pythonCommand = self.python_command(script, script_params)
+    logger.debug("Running command reflectively " + pprint.pformat(pythonCommand))
+    
+    script_dir = os.path.dirname(script)
+    self.open_subprocess_files(tmpoutfile, tmperrfile, override_output_files)
+    returncode = 1
+
+    try:
+      with PythonContext(script_dir, pythonCommand):
+        imp.load_source('__main__', script)
+    except SystemExit as e:
+      returncode = e.code
+      if returncode:
+        logger.debug("Reflective command failed with return_code=" + str(e))
+    except Exception: 
+      logger.debug("Reflective command failed with exception:", exc_info=1)
+    else: 
+      returncode = 0
+      
+    return self.prepare_process_result(returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=timeout)
+  
+class PythonContext:
+  """
+  Sets and resets some context like imports, pythonpath, args.
+  Also it disable logging into ambari-agent.log for reflectively called scripts.
+  """
+  def __init__(self, script_dir, pythonCommand):
+    self.script_dir = script_dir
+    self.pythonCommand = pythonCommand
+    
+  def __enter__(self):
+    self.old_sys_path = copy.copy(sys.path)
+    self.old_agv = copy.copy(sys.argv)
+    self.old_sys_modules = copy.copy(sys.modules)
+    self.old_logging_disable = logging.root.manager.disable
+    
+    logging.disable(logging.ERROR)
+    sys.path.append(self.script_dir)
+    sys.argv = self.pythonCommand[1:]
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    sys.path = self.old_sys_path
+    sys.argv = self.old_agv
+    logging.disable(self.old_logging_disable)
+    self.revert_sys_modules(self.old_sys_modules)
+    return False
+  
+  def revert_sys_modules(self, value):
+    sys.modules.update(value)
+    
+    for k in copy.copy(sys.modules):
+      if not k in value:
+        del sys.modules[k]

+ 6 - 5
ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py

@@ -285,14 +285,15 @@ class TestCustomServiceOrchestrator(TestCase):
         'exitcode': 0,
         'exitcode': 0,
       }
       }
     ret = orchestrator.runCommand(command, "out.txt", "err.txt",
     ret = orchestrator.runCommand(command, "out.txt", "err.txt",
-              forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
+              forced_command_name=CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON)
     ## Check that override_output_files was true only during first call
     ## Check that override_output_files was true only during first call
-    self.assertEquals(run_file_mock.call_args_list[0][0][10], True)
-    self.assertEquals(run_file_mock.call_args_list[1][0][10], False)
-    self.assertEquals(run_file_mock.call_args_list[2][0][10], False)
+    print run_file_mock
+    self.assertEquals(run_file_mock.call_args_list[0][0][8], True)
+    self.assertEquals(run_file_mock.call_args_list[1][0][8], False)
+    self.assertEquals(run_file_mock.call_args_list[2][0][8], False)
     ## Check that forced_command_name was taken into account
     ## Check that forced_command_name was taken into account
     self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
     self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
-                                  CustomServiceOrchestrator.COMMAND_NAME_STATUS)
+                                  CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON)
 
 
     run_file_mock.reset_mock()
     run_file_mock.reset_mock()
 
 

+ 8 - 11
ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py

@@ -37,6 +37,7 @@ if get_platform() != PLATFORM_WINDOWS:
 else:
 else:
   os_distro_value = ('win2012serverr2','6.3','WindowsServer')
   os_distro_value = ('win2012serverr2','6.3','WindowsServer')
 
 
+@patch.object(PythonExecutor, "open_subprocess_files", new=MagicMock(return_value =("", "")))
 class TestPythonExecutor(TestCase):
 class TestPythonExecutor(TestCase):
 
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@@ -64,8 +65,8 @@ class TestPythonExecutor(TestCase):
     subproc_mock.returncode = None
     subproc_mock.returncode = None
     callback_method = MagicMock()
     callback_method = MagicMock()
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
-      ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile,
-      PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO", callback_method, '1'))
+      ["arg1", "arg2"], tmpoutfile, tmperrfile,
+      PYTHON_TIMEOUT_SECONDS, tmpstrucout, callback_method, '1'))
     thread.start()
     thread.start()
     time.sleep(0.1)
     time.sleep(0.1)
     subproc_mock.finished_event.wait()
     subproc_mock.finished_event.wait()
@@ -96,9 +97,9 @@ class TestPythonExecutor(TestCase):
     subproc_mock.returncode = 0
     subproc_mock.returncode = 0
     callback_method = MagicMock()
     callback_method = MagicMock()
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
-                                                      "/fake_tmp_dir", tmpoutfile, tmperrfile,
+                                                      tmpoutfile, tmperrfile,
                                                       PYTHON_TIMEOUT_SECONDS, tmpstrucout,
                                                       PYTHON_TIMEOUT_SECONDS, tmpstrucout,
-                                                      "INFO", callback_method, "1-1"))
+                                                      callback_method, "1-1"))
     thread.start()
     thread.start()
     time.sleep(0.1)
     time.sleep(0.1)
     subproc_mock.should_finish_event.set()
     subproc_mock.should_finish_event.set()
@@ -131,10 +132,10 @@ class TestPythonExecutor(TestCase):
     subproc_mock.returncode = 0
     subproc_mock.returncode = 0
     subproc_mock.should_finish_event.set()
     subproc_mock.should_finish_event.set()
     callback_method = MagicMock()
     callback_method = MagicMock()
-    result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir",
+    result = executor.run_file("file", ["arg1", "arg2"],
                                tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
                                tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
-                               tmpstructuredoutfile, "INFO", callback_method, "1-1")
-    self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
+                               tmpstructuredoutfile, callback_method, "1-1")
+    self.assertEquals(result, {'exitcode': 0, 'stderr': '', 'stdout': '',
                                'structuredOut': {}})
                                'structuredOut': {}})
     self.assertTrue(callback_method.called)
     self.assertTrue(callback_method.called)
 
 
@@ -179,11 +180,7 @@ class TestPythonExecutor(TestCase):
 
 
     def communicate(self):
     def communicate(self):
       self.started_event.set()
       self.started_event.set()
-      self.tmpout.write("Dummy output")
-      self.tmpout.flush()
 
 
-      self.tmperr.write("Dummy err")
-      self.tmperr.flush()
       self.should_finish_event.wait()
       self.should_finish_event.wait()
       self.finished_event.set()
       self.finished_event.set()
       pass
       pass

+ 8 - 4
ambari-common/src/main/python/resource_management/libraries/functions/check_process_status.py

@@ -47,10 +47,14 @@ def check_process_status(pid_file):
     Logger.debug("Pid file {0} does not exist".format(pid_file))
     Logger.debug("Pid file {0} does not exist".format(pid_file))
     raise ComponentIsNotRunning()
     raise ComponentIsNotRunning()
 
 
-  code, out = shell.call(["ps","-p", str(pid)])
-  
-  if code:
+  try:
+    # Kill will not actually kill the process
+    # From the doc:
+    # If sig is 0, then no signal is sent, but error checking is still
+    # performed; this can be used to check for the existence of a
+    # process ID or process group ID.
+    os.kill(pid, 0)
+  except OSError:
     Logger.debug("Process with pid {0} is not running. Stale pid file"
     Logger.debug("Process with pid {0} is not running. Stale pid file"
               " at {1}".format(pid, pid_file))
               " at {1}".format(pid, pid_file))
     raise ComponentIsNotRunning()
     raise ComponentIsNotRunning()
-  pass

+ 0 - 7
ambari-common/src/main/python/resource_management/libraries/script/script.py

@@ -214,13 +214,6 @@ class Script(object):
         method(env)
         method(env)
         if command_name == "install":
         if command_name == "install":
           self.set_version()
           self.set_version()
-    except ClientComponentHasNoStatus or ComponentIsNotRunning:
-      # Support of component status checks.
-      # Non-zero exit code is interpreted as an INSTALLED status of a component
-      sys.exit(1)
-    except Fail:
-      logger.exception("Error while executing command '{0}':".format(command_name))
-      sys.exit(1)
     finally:
     finally:
       if self.should_expose_component_version(command_name):
       if self.should_expose_component_version(command_name):
         self.save_component_version_to_structured_out()
         self.save_component_version_to_structured_out()