Browse Source

AMBARI-6055. Flume: API returns agents info even after it was removed (ncole)

Nate Cole 11 years ago
parent
commit
5e17fed311

+ 37 - 17
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py

@@ -25,23 +25,23 @@ from resource_management import *
 def flume(action = None):
   import params
 
-  flume_agents = {}
-  if params.flume_conf_content is not None:
-    flume_agents = build_flume_topology(params.flume_conf_content)
-
-  agent_names = flume_agents.keys()
-  if len(params.flume_command_targets) > 0:
-    agent_names = params.flume_command_targets
-
   if action == 'config':
+    # remove previously defined meta's
+    for n in find_expected_agent_names():
+      os.unlink(os.path.join(params.flume_conf_dir, n, 'ambari-meta.json'))
+
     Directory(params.flume_conf_dir)
     Directory(params.flume_log_dir, owner=params.flume_user)
 
+    flume_agents = {}
+    if params.flume_conf_content is not None:
+      flume_agents = build_flume_topology(params.flume_conf_content)
+
     for agent in flume_agents.keys():
-      flume_agent_conf_dir = params.flume_conf_dir + os.sep + agent
-      flume_agent_conf_file = flume_agent_conf_dir + os.sep + 'flume.conf'
-      flume_agent_meta_file = flume_agent_conf_dir + os.sep + 'ambari-meta.json'
-      flume_agent_log4j_file = flume_agent_conf_dir + os.sep + 'log4j.properties'
+      flume_agent_conf_dir = os.path.join(params.flume_conf_dir, agent)
+      flume_agent_conf_file = os.path.join(flume_agent_conf_dir, 'flume.conf')
+      flume_agent_meta_file = os.path.join(flume_agent_conf_dir, 'ambari-meta.json')
+      flume_agent_log4j_file = os.path.join(flume_agent_conf_dir, 'log4j.properties')
 
       Directory(flume_agent_conf_dir)
 
@@ -64,7 +64,7 @@ def flume(action = None):
       '--conf-file {{2}} '
       '{{3}}')
 
-    for agent in agent_names:
+    for agent in cmd_target_names():
       flume_agent_conf_dir = params.flume_conf_dir + os.sep + agent
       flume_agent_conf_file = flume_agent_conf_dir + os.sep + "flume.conf"
       flume_agent_pid_file = params.flume_run_dir + os.sep + agent + ".pid"
@@ -96,6 +96,8 @@ def flume(action = None):
     if 0 == len(pid_files):
       return
 
+    agent_names = cmd_target_names()
+
     if len(agent_names) > 0:
       for agent in agent_names:
         pid_file = params.flume_run_dir + os.sep + agent + '.pid'
@@ -199,11 +201,9 @@ def live_status(pid_file):
 def flume_status():
   import params
 
-  # these are what Ambari believes should be running
-  meta_files = glob.glob(params.flume_conf_dir + os.sep + "*/ambari-meta.json")
+  meta_files = find_expected_agent_names()
   pid_files = []
-  for meta_file in meta_files:
-    agent_name = os.path.dirname(meta_file).split(os.sep).pop()
+  for agent_name in meta_files:
     pid_files.append(os.path.join(params.flume_run_dir, agent_name + '.pid'))
 
   procs = []
@@ -212,3 +212,23 @@ def flume_status():
 
   return procs
 
+# these are what Ambari believes should be running
+def find_expected_agent_names():
+  import params
+
+  files = glob.glob(params.flume_conf_dir + os.sep + "*/ambari-meta.json")
+  expected = []
+
+  for f in files:
+    expected.append(os.path.dirname(f).split(os.sep).pop())
+
+  return expected
+
+def cmd_target_names():
+  import params
+
+  if len(params.flume_command_targets) > 0:
+    return params.flume_command_targets
+  else:
+    return find_expected_agent_names()
+

+ 20 - 2
ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py

@@ -35,10 +35,12 @@ class TestFlumeHandler(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("os.path.isfile")
-  def test_start_default(self, os_path_isfile_mock):
+  @patch("flume.cmd_target_names")
+  def test_start_default(self, cmd_target_names_mock, os_path_isfile_mock):
     # 1st call is to check if the conf file is there - that should be True
     # 2nd call is to check if the process is live - that should be False
     os_path_isfile_mock.side_effect = [True, False]
+    cmd_target_names_mock.return_value = ["a1"]
 
     self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
                        classname = "FlumeHandler",
@@ -64,7 +66,7 @@ class TestFlumeHandler(RMFTestCase):
 
   @patch("glob.glob")
   def test_stop_default(self, glob_mock):
-    glob_mock.return_value = ['/var/run/flume/a1.pid']
+    glob_mock.side_effect = [['/var/run/flume/a1/pid'], ['/etc/flume/conf/a1/ambari-meta.json']]
 
     self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
                        classname = "FlumeHandler",
@@ -255,6 +257,22 @@ class TestFlumeHandler(RMFTestCase):
 
     self.assertNoMoreResources()
 
+  @patch("flume.find_expected_agent_names")
+  @patch("os.unlink")
+  def test_configure_with_existing(self, os_unlink_mock, find_expected_mock):
+    find_expected_mock.return_value = ["x1"]
+
+    self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "configure",
+                       config_file="default.json")
+
+    self.assertTrue(os_unlink_mock.called)
+    os_unlink_mock.assert_called_with('/etc/flume/conf/x1/ambari-meta.json')
+
+    self.assert_configure_default()
+    self.assertNoMoreResources()
+
 def build_flume(content):
   result = {}
   agent_names = []