Преглед изворни кода

AMBARI-10657. Ambari restart/stop operation loses control of Flume agents (aonishuk)

Andrew Onishuk пре 10 година
родитељ
комит
6ea66b63eb

+ 18 - 1
ambari-common/src/main/python/resource_management/libraries/functions/flume_agent_helper.py

@@ -21,6 +21,7 @@ limitations under the License.
 import json
 import json
 import glob
 import glob
 import os
 import os
+import time
 
 
 from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.core.exceptions import ComponentIsNotRunning
 from resource_management.libraries.functions import check_process_status
 from resource_management.libraries.functions import check_process_status
@@ -113,4 +114,20 @@ def get_live_status(pid_file, flume_conf_directory):
   except:
   except:
     pass
     pass
 
 
-  return res
+  return res
+
+
+def await_flume_process_termination(pid_file, try_count=20, retry_delay=2):
+  """
+  Waits while the flume agent represented by the specified file is being stopped.
+  :param pid_file: the PID file of the agent to check
+  :param try_count: the count of checks
+  :param retry_delay: time between checks in seconds
+  :return: True if the agent was stopped, False otherwise
+  """
+  for i in range(0, try_count):
+    if not is_flume_process_live(pid_file):
+      return True
+    else:
+      time.sleep(retry_delay)
+  return not is_flume_process_live(pid_file)

+ 3 - 0
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py

@@ -23,6 +23,7 @@ import os
 from resource_management import *
 from resource_management import *
 from resource_management.libraries.functions.flume_agent_helper import is_flume_process_live
 from resource_management.libraries.functions.flume_agent_helper import is_flume_process_live
 from resource_management.libraries.functions.flume_agent_helper import find_expected_agent_names
 from resource_management.libraries.functions.flume_agent_helper import find_expected_agent_names
+from resource_management.libraries.functions.flume_agent_helper import await_flume_process_termination
 from ambari_commons import OSConst
 from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 
 
@@ -178,6 +179,8 @@ def flume(action = None):
       pid_file = params.flume_run_dir + os.sep + agent + '.pid'
       pid_file = params.flume_run_dir + os.sep + agent + '.pid'
       pid = format('`cat {pid_file}` > /dev/null 2>&1')
       pid = format('`cat {pid_file}` > /dev/null 2>&1')
       Execute(format('kill {pid}'), ignore_failures=True)
       Execute(format('kill {pid}'), ignore_failures=True)
+      if not await_flume_process_termination(pid_file):
+        raise Fail("Can't stop flume agent: {0}".format(agent))
       File(pid_file, action = 'delete')
       File(pid_file, action = 'delete')
 
 
 
 

+ 8 - 2
ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py

@@ -74,8 +74,10 @@ class TestFlumeHandler(RMFTestCase):
 
 
   @patch("glob.glob")
   @patch("glob.glob")
   @patch("flume._set_desired_state")
   @patch("flume._set_desired_state")
-  def test_stop_default(self, set_desired_mock, glob_mock):
+  @patch("flume.await_flume_process_termination")
+  def test_stop_default(self, await_flume_process_termination_mock, set_desired_mock, glob_mock):
     glob_mock.side_effect = [['/var/run/flume/a1/pid'], ['/etc/flume/conf/a1/ambari-meta.json']]
     glob_mock.side_effect = [['/var/run/flume/a1/pid'], ['/etc/flume/conf/a1/ambari-meta.json']]
+    await_flume_process_termination_mock.return_value = True
 
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
                        classname = "FlumeHandler",
                        classname = "FlumeHandler",
@@ -85,6 +87,7 @@ class TestFlumeHandler(RMFTestCase):
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
 
     self.assertTrue(glob_mock.called)
     self.assertTrue(glob_mock.called)
+    await_flume_process_termination_mock.assert_called_with('/var/run/flume/a1.pid')
 
 
     self.assertTrue(set_desired_mock.called)
     self.assertTrue(set_desired_mock.called)
     self.assertTrue(set_desired_mock.call_args[0][0] == 'INSTALLED')
     self.assertTrue(set_desired_mock.call_args[0][0] == 'INSTALLED')
@@ -311,8 +314,10 @@ class TestFlumeHandler(RMFTestCase):
     self.assertNoMoreResources()
     self.assertNoMoreResources()
 
 
   @patch("glob.glob")
   @patch("glob.glob")
-  def test_stop_single(self, glob_mock):
+  @patch("flume.await_flume_process_termination")
+  def test_stop_single(self, await_flume_process_termination_mock, glob_mock):
     glob_mock.return_value = ['/var/run/flume/b1.pid']
     glob_mock.return_value = ['/var/run/flume/b1.pid']
+    await_flume_process_termination_mock.return_value = True
 
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
                        classname = "FlumeHandler",
                        classname = "FlumeHandler",
@@ -322,6 +327,7 @@ class TestFlumeHandler(RMFTestCase):
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
 
 
     self.assertTrue(glob_mock.called)
     self.assertTrue(glob_mock.called)
+    await_flume_process_termination_mock.assert_called_with('/var/run/flume/b1.pid')
 
 
     self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/b1.pid` > /dev/null 2>&1',
     self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/b1.pid` > /dev/null 2>&1',
       ignore_failures = True)
       ignore_failures = True)