Browse Source

AMBARI-15558. ambari-agent upstart script broken in RHEL6 (aonishuk)

Andrew Onishuk 9 years ago
parent
commit
22c3dcd5b3

+ 0 - 1
ambari-agent/etc/init/ambari-agent.conf

@@ -17,7 +17,6 @@ description     "ambari agent"
 
 stop on runlevel [06]
 
-kill signal SIGKILL
 respawn
 
 script

+ 1 - 0
ambari-agent/src/main/python/ambari_agent/ExitHelper.py

@@ -64,6 +64,7 @@ class ExitHelper(object):
 
   def exit(self, code):
     self.execute_cleanup()
+    logger.info("Cleanup finished, exiting with code:" + str(code))
     os._exit(code)
 
 

+ 7 - 20
ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py

@@ -96,18 +96,9 @@ def debug(sig, frame):
 
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class HeartbeatStopHandlersLinux(HeartbeatStopHandlers):
-  def __init__(self, stopEvent=None):
-    # Event is used for synchronizing heartbeat iterations (to make possible
-    # manual wait() interruption between heartbeats )
+  def __init__(self):
     self.heartbeat_wait_event = threading.Event()
-
-    # Event is used to stop the Agent process
-    if stopEvent is None:
-      # Allow standalone testing
-      self.stop_event = threading.Event()
-    else:
-      # Allow one unique event per process
-      self.stop_event = stopEvent
+    self._stop = False
 
   def set_heartbeat(self):
     self.heartbeat_wait_event.set()
@@ -116,19 +107,15 @@ class HeartbeatStopHandlersLinux(HeartbeatStopHandlers):
     self.heartbeat_wait_event.clear()
 
   def set_stop(self):
-    self.stop_event.set()
+    self._stop = True
 
   def wait(self, timeout1, timeout2=0):
-    if self.heartbeat_wait_event.wait(timeout=timeout1):
-      # Event signaled, exit
-      return 1
-    # Stop loop when stop event received
-    # Otherwise sleep a bit more to allow STATUS_COMMAND results to be collected
-    # and sent in one heartbeat. Also avoid server overload with heartbeats
-    if self.stop_event.wait(timeout=timeout2):
+    if self._stop:
       logger.info("Stop event received")
       return 0
-    # Timeout
+
+    if self.heartbeat_wait_event.wait(timeout=timeout1):
+      return 1
     return -1
 
 

+ 23 - 15
ambari-agent/src/main/python/ambari_agent/main.py

@@ -69,6 +69,9 @@ def setup_logging(logger, filename, logging_level):
   logger.setLevel(logging_level)
   logger.info("loglevel=logging.{0}".format(logging._levelNames[logging_level]))
 
+GRACEFUL_STOP_TRIES = 10
+GRACEFUL_STOP_TRIES_SLEEP = 3
+
 
 def add_syslog_handler(logger):
     
@@ -161,22 +164,26 @@ def daemonize():
   pid = str(os.getpid())
   file(ProcessHelper.pidfile, 'w').write(pid)
 
-
 def stop_agent():
 # stop existing Ambari agent
   pid = -1
   runner = shellRunner()
   try:
-    f = open(ProcessHelper.pidfile, 'r')
-    pid = f.read()
+    with open(ProcessHelper.pidfile, 'r') as f:
+      pid = f.read()
     pid = int(pid)
-    f.close()
+    
     runner.run([AMBARI_SUDO_BINARY, 'kill', '-15', str(pid)])
-    time.sleep(5)
-    if os.path.exists(ProcessHelper.pidfile):
-      raise Exception("PID file still exists.")
-    sys.exit(0)
+    for i in range(GRACEFUL_STOP_TRIES):
+      result = runner.run([AMBARI_SUDO_BINARY, 'kill', '-0', str(pid)])
+      if result['exitCode'] != 0:
+        logger.info("Agent died gracefully, exiting.")
+        sys.exit(0)
+      time.sleep(GRACEFUL_STOP_TRIES_SLEEP)
+    logger.info("Agent not going to die gracefully, going to execute kill -9")
+    raise Exception("Agent is running")
   except Exception, err:
+    #raise
     if pid == -1:
       print ("Agent process is not running")
     else:
@@ -306,7 +313,8 @@ def main(heartbeat_stop_callback=None):
         # Launch Controller communication
         controller = Controller(config, server_hostname, heartbeat_stop_callback)
         controller.start()
-        controller.join()
+        while controller.is_alive():
+          time.sleep(0.1)
 
       #
       # If Ambari Agent connected to the server or
@@ -314,9 +322,7 @@ def main(heartbeat_stop_callback=None):
       # Clean up if not Windows OS
       #
       if connected or stopped:
-        if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
-          ExitHelper().execute_cleanup()
-          stop_agent()
+        ExitHelper().exit(0)
         logger.info("finished")
         break
     pass # for server_hostname in server_hostnames
@@ -330,7 +336,9 @@ if __name__ == "__main__":
     heartbeat_stop_callback = bind_signal_handlers(agentPid)
   
     main(heartbeat_stop_callback)
-  except:
+  except SystemExit as e:
+    raise e
+  except BaseException as e:
     if is_logger_setup:
-      logger.exception("Fatal exception occurred:")
-    raise
+      logger.exception("Exiting with exception:" + e)
+  raise

+ 0 - 5
ambari-agent/src/test/python/ambari_agent/TestController.py

@@ -548,8 +548,6 @@ class TestController(unittest.TestCase):
     response["restartAgent"] = "false"
     self.controller.heartbeatWithServer()
 
-    event_mock.assert_any_call(timeout=
-      self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
 
     # Check that server continues to heartbeat after connection errors
     self.controller.responseId = 1
@@ -569,9 +567,6 @@ class TestController(unittest.TestCase):
     self.controller.heartbeatWithServer()
     self.assertTrue(sendRequest.call_count > 5)
 
-    event_mock.assert_called_with(timeout=
-      self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
-
     sys.stdout = sys.__stdout__
     self.controller.sendRequest = Controller.Controller.sendRequest
     self.controller.sendRequest = Controller.Controller.addToQueue

+ 4 - 2
ambari-agent/src/test/python/ambari_agent/TestMain.py

@@ -44,6 +44,7 @@ with patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_
   from ambari_commons.os_check import OSConst, OSCheck
   from ambari_agent.ExitHelper import ExitHelper
 
+
 class TestMain(unittest.TestCase):
 
   def setUp(self):
@@ -304,8 +305,8 @@ class TestMain(unittest.TestCase):
   @patch.object(main, "update_log_level")
   @patch.object(NetUtil.NetUtil, "try_to_connect")
   @patch.object(Controller, "__init__")
+  @patch.object(Controller, "is_alive")
   @patch.object(Controller, "start")
-  @patch.object(Controller, "join")
   @patch("optparse.OptionParser.parse_args")
   @patch.object(DataCleaner,"start")
   @patch.object(DataCleaner,"__init__")
@@ -313,13 +314,14 @@ class TestMain(unittest.TestCase):
   @patch.object(PingPortListener,"__init__")
   @patch.object(ExitHelper,"execute_cleanup")
   def test_main(self, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
-                parse_args_mock, join_mock, start_mock, Controller_init_mock, try_to_connect_mock,
+                parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock,
                 update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
                 ambari_config_mock,
                 stop_mock, bind_signal_handlers_mock,
                 setup_logging_mock, socket_mock):
     data_clean_init_mock.return_value = None
     Controller_init_mock.return_value = None
+    Controller_is_alive_mock.return_value = False
     ping_port_init_mock.return_value = None
     options = MagicMock()
     parse_args_mock.return_value = (options, MagicMock)