Browse Source

AMBARI-6048. Ambari Agent script should check for running processes before starting (dlysnichenko)

Lisnichenko Dmitro 11 năm trước cách đây
mục cha
commit
7d875fbac7

+ 21 - 11
ambari-agent/src/main/python/ambari_agent/PingPortListener.py

@@ -23,12 +23,15 @@ import logging
 import AmbariConfig
 import threading
 import socket
+import subprocess
 
 logger = logging.getLogger()
+FUSER_CMD = "fuser {0}/tcp 2>&1 | awk '{1}'"
+PSPF_CMD = "ps -fp {0}"
+PORT_IN_USE_MESSAGE = "Could not open port {0} because port already used by another process:\n{1}"
 
 class PingPortListener(threading.Thread):
 
-
   def __init__(self, config):
     threading.Thread.__init__(self)
     self.daemon = True
@@ -36,17 +39,24 @@ class PingPortListener(threading.Thread):
     self.config = config
     self.host = '0.0.0.0'
     self.port = int(self.config.get('agent','ping_port'))
-    try:
-      self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-      self.socket.bind((self.host, self.port))
-      self.socket.listen(1)
-    except Exception as ex:
-      logger.error("Failed to start ping port listener of:" + str(ex));
-      sys.exit(1)
-    else:
-      config.set('agent','current_ping_port',str(self.socket.getsockname()[1]))
-      logger.info("Ping port listener started on port: " + str(self.socket.getsockname()[1]))
+    if not self.port == None and not self.port == 0:
+      (stdoutdata, stderrdata) = self.run_os_command_in_shell(FUSER_CMD.format(str(self.port), "{print $2}"))
+      if stdoutdata.strip():
+        (stdoutdata, stderrdata) = self.run_os_command_in_shell(PSPF_CMD.format(stdoutdata.strip()))
+        raise Exception(PORT_IN_USE_MESSAGE.format(str(self.port), stdoutdata))      
+    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self.socket.bind((self.host, self.port))
+    self.socket.listen(1)
+    config.set('agent','current_ping_port',str(self.socket.getsockname()[1]))
+    logger.info("Ping port listener started on port: " + str(self.socket.getsockname()[1]))
+
 
+  def run_os_command_in_shell(self, command):
+    process = subprocess.Popen(command, stdout=subprocess.PIPE,
+              stdin=subprocess.PIPE,
+              stderr=subprocess.PIPE,
+              shell=True)
+    return process.communicate()
 
   def __del__(self):
     logger.info("Ping port listener killed")

+ 7 - 1
ambari-agent/src/main/python/ambari_agent/main.py

@@ -211,7 +211,13 @@ def main():
   daemonize()
 
   # Starting ping port listener
-  ping_port_listener = PingPortListener(config)
+  try:
+    ping_port_listener = PingPortListener(config)
+  except Exception as ex:
+    err_message = "Failed to start ping port listener of: " + str(ex)
+    logger.error(err_message);
+    sys.stderr.write(err_message)
+    sys.exit(1)
   ping_port_listener.start()
 
   update_log_level(config)

+ 21 - 7
ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py

@@ -31,10 +31,17 @@ class TestPingPortListener(unittest.TestCase):
     self.config.get.return_value = 55000
     PingPortListener.logger = MagicMock()
 
+  @patch("subprocess.Popen")
   @patch("socket.socket")
-  def test_init_success(self,socketMock):
+  def test_init_success(self,socketMock,popen_mock):
+    procObj = MagicMock()
+    procObj.communicate = MagicMock()
+    procObj.communicate.return_value = {"": 0, "log": "log"}
+    popen_mock.return_value = procObj
     PingPortListener.logger.reset_mock()
+    popen_mock.reset_mock()
     allive_daemon = PingPortListener.PingPortListener(self.config)
+    self.assertTrue(popen_mock.called)
     self.assertFalse(PingPortListener.logger.warn.called)
     self.assertTrue(socketMock.call_args_list[0][0][0] == socket.AF_INET)
     self.assertTrue(socketMock.call_args_list[0][0][1] == socket.SOCK_STREAM)
@@ -45,15 +52,22 @@ class TestPingPortListener(unittest.TestCase):
 
 
 
+  @patch("subprocess.Popen")
   @patch.object(socket.socket,"bind")
   @patch.object(socket.socket,"listen")
-  @patch.object(socket.socket,"__init__")
-  @patch.object(sys, "exit")
-  def test_init_warn(self, sys_exit_mock, socketInitMock,socketListenMock,socketBindMock):
+  def test_init_warn(self,socketListenMock,socketBindMock,popen_mock):
+    procObj = MagicMock()
+    procObj.communicate = MagicMock()
+    procObj.communicate.return_value = {"mine.py": 0, "log": "log"}
+    popen_mock.return_value = procObj
     PingPortListener.logger.reset_mock()
-    allive_daemon = PingPortListener.PingPortListener(self.config)
-    self.assertTrue(socketInitMock.called)
-    self.assertTrue(sys_exit_mock.called)
+    try:
+      PingPortListener.PingPortListener(self.config)
+      self.fail("Should throw exception")
+    except Exception as fe:
+      # Expected
+      self.assertTrue("port already used" in str(fe))
+      pass
 
 if __name__ == "__main__":
   suite = unittest.TestLoader().loadTestsFromTestCase(PingPortListener)