Quellcode durchsuchen

AMBARI-18728. During cluster install, Components get timed out icon while starting (aonishuk)

Andrew Onishuk vor 8 Jahren
Ursprung
Commit
0c7eac7e9c

+ 4 - 0
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -206,6 +206,10 @@ class ActionQueue(threading.Thread):
         self.process_status_command_result(result)
       except Queue.Empty:
         pass
+      except IOError:
+        # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time.
+        # During queue.close IOError will be thrown (this prevents from permanently dead-locked get).
+        pass
 
   def createCommandHandle(self, command):
     if command.has_key('__handle'):

+ 7 - 3
ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py

@@ -47,7 +47,7 @@ class StatusCommandsExecutor(multiprocessing.Process):
       bind_debug_signal_handlers()
       while True:
         command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status command appears
-        logger.info("Running status command for {0}".format(command['componentName'])) # TODO: change to logger.debug once fixed
+        logger.debug("Running status command for {0}".format(command['componentName']))
         
         timeout_timer = threading.Timer( self.status_command_timeout, self.respawn, [command])
         timeout_timer.start()
@@ -55,7 +55,7 @@ class StatusCommandsExecutor(multiprocessing.Process):
         self.process_status_command(command)
 
         timeout_timer.cancel()
-        logger.info("Completed status command for {0}".format(command['componentName']))  # TODO: change to logger.debug once fixed
+        logger.debug("Completed status command for {0}".format(command['componentName']))
     except:
       logger.exception("StatusCommandsExecutor process failed with exception:")
       raise
@@ -83,4 +83,8 @@ class StatusCommandsExecutor(multiprocessing.Process):
       raise
 
   def kill(self):
-    os.kill(self.pid, signal.SIGKILL)
+    os.kill(self.pid, signal.SIGKILL)
+
+    # prevent queue from ending up with non-freed semaphores, locks during put. Which would result in dead-lock in process executing get.
+    self.actionQueue.statusCommandResultQueue.close()
+    self.actionQueue.statusCommandResultQueue.join_thread()