Prechádzať zdrojové kódy

AMBARI-20632. With multi-process StatusCommandsExecutor, Status commands are taking too long to report back (echekanskiy)

Eugene Chekanskiy 8 rokov pred
rodič
commit
bb8b73e844

+ 1 - 1
ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

@@ -293,7 +293,7 @@ class AmbariConfig:
     return int(self.get('agent', 'parallel_execution', 0))
     return int(self.get('agent', 'parallel_execution', 0))
 
 
   def get_multiprocess_status_commands_executor_enabled(self):
   def get_multiprocess_status_commands_executor_enabled(self):
-    return bool(int(self.get('agent', 'multiprocess_status_commands_executor_enabled', 0)))
+    return bool(int(self.get('agent', 'multiprocess_status_commands_executor_enabled', 1)))
 
 
   def update_configuration_from_registration(self, reg_resp):
   def update_configuration_from_registration(self, reg_resp):
     if reg_resp and AmbariConfig.AMBARI_PROPERTIES_CATEGORY in reg_resp:
     if reg_resp and AmbariConfig.AMBARI_PROPERTIES_CATEGORY in reg_resp:

+ 100 - 116
ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py

@@ -49,11 +49,14 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
     self.config = config
     self.config = config
     self.actionQueue = actionQueue
     self.actionQueue = actionQueue
     self.statusCommandQueue = Queue.Queue()
     self.statusCommandQueue = Queue.Queue()
-    self.need_relaunch = False
+    self.need_relaunch = (False, None) #  tuple (bool, str|None) with flag to relaunch and reason of relaunch
 
 
   def put_commands(self, commands):
   def put_commands(self, commands):
-    while not self.statusCommandQueue.empty():
-      self.statusCommandQueue.get()
+    with self.statusCommandQueue.mutex:
+      qlen = len(self.statusCommandQueue.queue)
+      if qlen:
+        logger.info("Removing %s stale status commands from queue", qlen)
+      self.statusCommandQueue.queue.clear()
 
 
     for command in commands:
     for command in commands:
       logger.info("Adding " + command['commandType'] + " for component " + \
       logger.info("Adding " + command['commandType'] + " for component " + \
@@ -85,12 +88,13 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     self.config = config
     self.config = config
     self.actionQueue = actionQueue
     self.actionQueue = actionQueue
 
 
-    self._can_relaunch_lock = threading.RLock()
-    self._can_relaunch = True
+    self.can_relaunch = True
 
 
     # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of
     # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of
     # old and new queues
     # old and new queues
     self.usage_lock = threading.RLock()
     self.usage_lock = threading.RLock()
+    # protects against simultaneous killing/creating from different threads.
+    self.kill_lock = threading.RLock()
 
 
     self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
     self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
     self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
     self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
@@ -104,42 +108,32 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     self.mp_result_logs = multiprocessing.Queue()
     self.mp_result_logs = multiprocessing.Queue()
     self.mp_task_queue = multiprocessing.Queue()
     self.mp_task_queue = multiprocessing.Queue()
 
 
-  @property
-  def can_relaunch(self):
-    with self._can_relaunch_lock:
-      return self._can_relaunch
-
-  @can_relaunch.setter
-  def can_relaunch(self, value):
-    with self._can_relaunch_lock:
-      self._can_relaunch = value
-
-  def _log_message(self, level, message, exception=None):
-    """
-    Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
-
-    :param level:
-    :param message:
-    :param exception:
-    :return:
-    """
-    result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
-    self.mp_result_logs.put((level, result_message, exception))
-
-  def _get_log_messages(self):
+  def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001):
     """
     """
-    Returns list of (level, message, exception) log messages.
-
-    :return: list of (level, message, exception)
+    Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains
+    extremely dumb protection against blocking too much at this method: will try to get all possible items for not more
+    than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised
+    ``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read
+    ~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too fast.
+
+    :param target_queue: queue to read from
+    :param max_time: maximum time to spend in this method call
+    :param max_empty_count: maximum allowed ``Queue.Empty`` in a row
+    :param read_break: time to wait before next read cycle iteration
+    :return: list of resulting objects
     """
     """
     results = []
     results = []
+    _empty = 0
+    _start = time.time()
     with self.usage_lock:
     with self.usage_lock:
       try:
       try:
-        while not self.mp_result_logs.empty():
+        while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count:
           try:
           try:
-            results.append(self.mp_result_logs.get(False))
+            results.append(target_queue.get(False))
+            _empty = 0
+            time.sleep(read_break) # sleep a little to get more accurate empty and qsize results
           except Queue.Empty:
           except Queue.Empty:
-            pass
+            _empty += 1
           except IOError:
           except IOError:
             pass
             pass
           except UnicodeDecodeError:
           except UnicodeDecodeError:
@@ -148,11 +142,23 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
         pass
         pass
     return results
     return results
 
 
+  def _log_message(self, level, message, exception=None):
+    """
+    Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
+
+    :param level:
+    :param message:
+    :param exception:
+    :return:
+    """
+    result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
+    self.mp_result_logs.put((level, result_message, exception))
+
   def _process_logs(self):
   def _process_logs(self):
     """
     """
     Get all available at this moment logs and prints them to logger.
     Get all available at this moment logs and prints them to logger.
     """
     """
-    for level, message, exception in self._get_log_messages():
+    for level, message, exception in self._drain_queue(self.mp_result_logs):
       if level == logging.ERROR:
       if level == logging.ERROR:
         logger.debug(message, exc_info=exception)
         logger.debug(message, exc_info=exception)
       if level == logging.WARN:
       if level == logging.WARN:
@@ -253,16 +259,6 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     :return:
     :return:
     """
     """
     with self.usage_lock:
     with self.usage_lock:
-      if not self.mp_task_queue.empty():
-        status_command_queue_size = 0
-        try:
-          while not self.mp_task_queue.empty():
-            self.mp_task_queue.get(False)
-            status_command_queue_size += 1
-        except Queue.Empty:
-          pass
-
-        logger.info("Number of status commands removed from queue : " + str(status_command_queue_size))
       for command in commands:
       for command in commands:
         logger.info("Adding " + command['commandType'] + " for component " + \
         logger.info("Adding " + command['commandType'] + " for component " + \
                     command['componentName'] + " of service " + \
                     command['componentName'] + " of service " + \
@@ -273,43 +269,29 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
 
 
   def process_results(self):
   def process_results(self):
     """
     """
-    Process all the results from the internal worker
+    Process all the results from the SCE worker process.
     """
     """
     self._process_logs()
     self._process_logs()
-    for result in self._get_results():
+    results = self._drain_queue(self.mp_result_queue)
+    logger.debug("Drained %s status commands results, ~%s remains in queue", len(results), self.mp_result_queue.qsize())
+    for result in results:
       try:
       try:
         self.actionQueue.process_status_command_result(result)
         self.actionQueue.process_status_command_result(result)
       except UnicodeDecodeError:
       except UnicodeDecodeError:
         pass
         pass
 
 
-  def _get_results(self):
-    """
-    Get all available results for status commands.
-
-    :return: list of results
-    """
-    results = []
-    with self.usage_lock:
-      try:
-        while not self.mp_result_queue.empty():
-          try:
-            results.append(self.mp_result_queue.get(False))
-          except Queue.Empty:
-            pass
-          except IOError:
-            pass
-          except UnicodeDecodeError:
-            pass
-      except IOError:
-        pass
-    return results
-
   @property
   @property
   def need_relaunch(self):
   def need_relaunch(self):
     """
     """
     Indicates if process need to be relaunched due to timeout or it is dead or even was not created.
     Indicates if process need to be relaunched due to timeout or it is dead or even was not created.
+
+    :return: tuple (bool, str|None) with flag to relaunch and reason of relaunch
     """
     """
-    return self.timedOutEvent.is_set() or not self.worker_process or not self.worker_process.is_alive()
+    if not self.worker_process or not self.worker_process.is_alive():
+      return True, "WORKER_DEAD"
+    elif self.timedOutEvent.is_set():
+      return True, "COMMAND_TIMEOUT"
+    return False, None
 
 
   def relaunch(self, reason=None):
   def relaunch(self, reason=None):
     """
     """
@@ -318,13 +300,15 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     :param reason: reason of restart
     :param reason: reason of restart
     :return:
     :return:
     """
     """
-    if self.can_relaunch:
-      self.kill(reason)
-      self.worker_process = multiprocessing.Process(target=self._worker_process_target)
-      self.worker_process.start()
-      logger.info("Started process with pid {0}".format(self.worker_process.pid))
-    else:
-      logger.debug("Relaunch does not allowed, can not relaunch")
+    with self.kill_lock:
+      logger.info("Relaunching child process reason:" + str(reason))
+      if self.can_relaunch:
+        self.kill(reason)
+        self.worker_process = multiprocessing.Process(target=self._worker_process_target)
+        self.worker_process.start()
+        logger.info("Started process with pid {0}".format(self.worker_process.pid))
+      else:
+        logger.debug("Relaunch does not allowed, can not relaunch")
 
 
   def kill(self, reason=None, can_relaunch=True):
   def kill(self, reason=None, can_relaunch=True):
     """
     """
@@ -336,43 +320,43 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     :param reason: reason of killing
     :param reason: reason of killing
     :return:
     :return:
     """
     """
-    logger.info("Killing child process reason:" + str(reason))
-    self.can_relaunch = can_relaunch
-
-    if not self.can_relaunch:
-      logger.info("Killing without possibility to relaunch...")
-
-    # try graceful stop, otherwise hard-kill
-    if self.worker_process and self.worker_process.is_alive():
-      self.mustDieEvent.set()
-      self.worker_process.join(timeout=3)
-      if self.worker_process.is_alive():
-        os.kill(self.worker_process.pid, signal.SIGKILL)
-        logger.info("Child process killed by -9")
+    with self.kill_lock:
+      self.can_relaunch = can_relaunch
+
+      if not self.can_relaunch:
+          logger.info("Killing without possibility to relaunch...")
+
+      # try graceful stop, otherwise hard-kill
+      if self.worker_process and self.worker_process.is_alive():
+        self.mustDieEvent.set()
+        self.worker_process.join(timeout=3)
+        if self.worker_process.is_alive():
+          os.kill(self.worker_process.pid, signal.SIGKILL)
+          logger.info("Child process killed by -9")
+        else:
+          # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
+          # this call will do nothing, as all logs will be processed in ActionQueue loop
+          self._process_logs()
+          logger.info("Child process died gracefully")
       else:
       else:
-        # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
-        # this call will do nothing, as all logs will be processed in ActionQueue loop
-        self._process_logs()
-        logger.info("Child process died gracefully")
-    else:
-      logger.info("Child process already dead")
-
-    # close queues and acquire usage lock
-    # closing both sides of pipes here, we need this hack in case of blocking on recv() call
-    self.mp_result_queue.close()
-    self.mp_result_queue._writer.close()
-    self.mp_result_logs.close()
-    self.mp_result_logs._writer.close()
-    self.mp_task_queue.close()
-    self.mp_task_queue._writer.close()
-
-    with self.usage_lock:
-      self.mp_result_queue.join_thread()
-      self.mp_result_queue = multiprocessing.Queue()
-      self.mp_task_queue.join_thread()
-      self.mp_task_queue = multiprocessing.Queue()
-      self.mp_result_logs.join_thread()
-      self.mp_result_logs = multiprocessing.Queue()
-      self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
-      self.mustDieEvent.clear()
-      self.timedOutEvent.clear()
+        logger.info("Child process already dead")
+
+      # close queues and acquire usage lock
+      # closing both sides of pipes here, we need this hack in case of blocking on recv() call
+      self.mp_result_queue.close()
+      self.mp_result_queue._writer.close()
+      self.mp_result_logs.close()
+      self.mp_result_logs._writer.close()
+      self.mp_task_queue.close()
+      self.mp_task_queue._writer.close()
+
+      with self.usage_lock:
+        self.mp_result_queue.join_thread()
+        self.mp_result_queue = multiprocessing.Queue()
+        self.mp_task_queue.join_thread()
+        self.mp_task_queue = multiprocessing.Queue()
+        self.mp_result_logs.join_thread()
+        self.mp_result_logs = multiprocessing.Queue()
+        self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
+        self.mustDieEvent.clear()
+        self.timedOutEvent.clear()

+ 3 - 2
ambari-agent/src/main/python/ambari_agent/main.py

@@ -332,8 +332,9 @@ def run_threads(server_hostname, heartbeat_stop_callback):
   while controller.is_alive():
   while controller.is_alive():
     time.sleep(0.1)
     time.sleep(0.1)
 
 
-    if controller.get_status_commands_executor().need_relaunch:
-      controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED")
+    need_relaunch, reason = controller.get_status_commands_executor().need_relaunch
+    if need_relaunch:
+      controller.get_status_commands_executor().relaunch(reason)
 
 
   controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)
   controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)