ソースを参照

AMBARI-11759. Flume start with configured flume agents, and flume status fail in non-root agent mode (aonishuk)

Andrew Onishuk 10 年 前
コミット
0474ef6ec6

+ 17 - 15
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume.py

@@ -24,6 +24,7 @@ from resource_management import *
 from resource_management.libraries.functions.flume_agent_helper import is_flume_process_live
 from resource_management.libraries.functions.flume_agent_helper import find_expected_agent_names
 from resource_management.libraries.functions.flume_agent_helper import await_flume_process_termination
+from resource_management.core import sudo
 from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 
@@ -87,7 +88,9 @@ def flume(action = None):
   if action == 'config':
     # remove previously defined meta's
     for n in find_expected_agent_names(params.flume_conf_dir):
-      os.unlink(os.path.join(params.flume_conf_dir, n, 'ambari-meta.json'))
+      File(os.path.join(params.flume_conf_dir, n, 'ambari-meta.json'),
+        action = "delete",
+      )
 
     Directory(params.flume_conf_dir,
               recursive=True,
@@ -171,10 +174,9 @@ def flume(action = None):
           wait_for_finish=False,
           environment={'JAVA_HOME': params.java_home}
         )
-
         # sometimes startup spawns a couple of threads - so only the first line may count
-
-        pid_cmd = format('pgrep -o -u {flume_user} -f ^{java_home}.*{agent}.* > {flume_agent_pid_file}')
+        pid_cmd = as_sudo(('pgrep', '-o', '-u', params.flume_user, '-f', format('^{java_home}.*{agent}.*'))) + \
+        " | " + as_sudo(('tee', flume_agent_pid_file)) + "  && test ${PIPESTATUS[0]} -eq 0"
         Execute(pid_cmd,
                 logoutput=True,
                 tries=20,
@@ -195,11 +197,15 @@ def flume(action = None):
 
 
     for agent in agent_names:
-      pid_file = params.flume_run_dir + os.sep + agent + '.pid'
-      pid = format('`cat {pid_file}` > /dev/null 2>&1')
-      Execute(format('kill {pid}'), ignore_failures=True)
+      pid_file = format("{flume_run_dir}/{agent}.pid")
+      
+      if is_flume_process_live(pid_file):
+        pid = shell.checked_call(("cat", pid_file), sudo=True)[1].strip()
+        Execute(('kill', pid), sudo=True)
+      
       if not await_flume_process_termination(pid_file):
         raise Fail("Can't stop flume agent: {0}".format(agent))
+        
       File(pid_file, action = 'delete')
 
 
@@ -261,18 +267,14 @@ def cmd_target_names():
 
 def _set_desired_state(state):
   import params
-  filename = os.path.join(params.flume_run_dir, 'ambari-state.txt')
-  File(filename,
+  File(params.ambari_state_file,
     content = state,
   )
 
-
 def get_desired_state():
   import params
-
-  try:
-    with open(os.path.join(params.flume_run_dir, 'ambari-state.txt'), 'r') as fp:
-      return fp.read()
-  except:
+  if os.path.exists(params.ambari_state_file):
+    return sudo.read_file(params.ambari_state_file)
+  else:
     return 'INSTALLED'
   

+ 1 - 0
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py

@@ -103,6 +103,7 @@ class FlumeHandler(Script):
     elif len(expected_agents) == 0 and 'INSTALLED' == get_desired_state():
       raise ComponentIsNotRunning()
 
+
   @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
   def status(self, env):
     import params

+ 1 - 0
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py

@@ -58,6 +58,7 @@ if Script.is_hdp_stack_greater_or_equal("2.2"):
 java_home = config['hostLevelParams']['java_home']
 flume_log_dir = '/var/log/flume'
 flume_run_dir = '/var/run/flume'
+ambari_state_file = format("{flume_run_dir}/ambari-state.txt")
 
 if (('flume-conf' in config['configurations']) and('content' in config['configurations']['flume-conf'])):
   flume_conf_content = config['configurations']['flume-conf']['content']

+ 14 - 21
ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py

@@ -65,10 +65,11 @@ class TestFlumeHandler(RMFTestCase):
         environment = {'JAVA_HOME': u'/usr/jdk64/jdk1.7.0_45'},
         wait_for_finish = False,
     )
-    self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*a1.* > /var/run/flume/a1.pid',
-      logoutput = True,
-      tries = 20,
-      try_sleep = 10)
+    self.assertResourceCalled('Execute', "ambari-sudo.sh [RMF_ENV_PLACEHOLDER] -H -E pgrep -o -u flume -f '^/usr/jdk64/jdk1.7.0_45.*a1.*' | ambari-sudo.sh [RMF_ENV_PLACEHOLDER] -H -E tee /var/run/flume/a1.pid  && test ${PIPESTATUS[0]} -eq 0",
+        logoutput = True,
+        tries = 20,
+        try_sleep = 10,
+    )
 
     self.assertNoMoreResources()
 
@@ -92,9 +93,6 @@ class TestFlumeHandler(RMFTestCase):
     self.assertTrue(set_desired_mock.called)
     self.assertTrue(set_desired_mock.call_args[0][0] == 'INSTALLED')
 
-    self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/a1.pid` > /dev/null 2>&1',
-      ignore_failures = True)
-
     self.assertResourceCalled('File', '/var/run/flume/a1.pid', action = ['delete'])
 
     self.assertNoMoreResources()
@@ -182,7 +180,6 @@ class TestFlumeHandler(RMFTestCase):
     self.assertNoMoreResources()
 
   def assert_configure_default(self):
-
     self.assertResourceCalled('Directory',
                               '/etc/flume/conf',
                               owner='flume',
@@ -331,10 +328,11 @@ class TestFlumeHandler(RMFTestCase):
         wait_for_finish = False,
     )
 
-    self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*b1.* > /var/run/flume/b1.pid',
-      logoutput = True,
-      tries = 20,
-      try_sleep = 10)
+    self.assertResourceCalled('Execute', "ambari-sudo.sh [RMF_ENV_PLACEHOLDER] -H -E pgrep -o -u flume -f '^/usr/jdk64/jdk1.7.0_45.*b1.*' | ambari-sudo.sh [RMF_ENV_PLACEHOLDER] -H -E tee /var/run/flume/b1.pid  && test ${PIPESTATUS[0]} -eq 0",
+        logoutput = True,
+        tries = 20,
+        try_sleep = 10,
+    )
 
     self.assertNoMoreResources()
 
@@ -354,16 +352,12 @@ class TestFlumeHandler(RMFTestCase):
     self.assertTrue(glob_mock.called)
     await_flume_process_termination_mock.assert_called_with('/var/run/flume/b1.pid')
 
-    self.assertResourceCalled('Execute', 'kill `cat /var/run/flume/b1.pid` > /dev/null 2>&1',
-      ignore_failures = True)
-
     self.assertResourceCalled('File', '/var/run/flume/b1.pid', action = ['delete'])
 
     self.assertNoMoreResources()
 
   @patch("flume.find_expected_agent_names")
-  @patch("os.unlink")
-  def test_configure_with_existing(self, os_unlink_mock, expected_names_mock):
+  def test_configure_with_existing(self, expected_names_mock):
     expected_names_mock.return_value = ["x1"]
 
     self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py",
@@ -372,10 +366,9 @@ class TestFlumeHandler(RMFTestCase):
                        config_file="default.json",
                        hdp_stack_version = self.STACK_VERSION,
                        target = RMFTestCase.TARGET_COMMON_SERVICES)
-
-    self.assertTrue(os_unlink_mock.called)
-    os_unlink_mock.assert_called_with('/etc/flume/conf/x1/ambari-meta.json')
-
+    self.assertResourceCalled('File', '/etc/flume/conf/x1/ambari-meta.json',
+        action = ['delete'],
+    )
     self.assert_configure_default()
     self.assertNoMoreResources()