Browse Source

AMBARI-6772. Flume: agent alerts should show a combination of host and agent name (ncole)

Nate Cole 10 years ago
parent
commit
a65ae123b1

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java

@@ -311,6 +311,7 @@ public class HeartbeatMonitor implements Runnable {
     statusCmd.setComponentName(componentName);
     statusCmd.setConfigurations(configurations);
     statusCmd.setConfigurationAttributes(configurationAttributes);
+    statusCmd.setHostname(hostname);
 
     // Fill command params
     Map<String, String> commandParams = statusCmd.getCommandParams();

+ 15 - 2
ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java

@@ -18,12 +18,12 @@
 package org.apache.ambari.server.agent;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import com.google.gson.annotations.SerializedName;
 import org.codehaus.jackson.annotate.JsonProperty;
 
+import com.google.gson.annotations.SerializedName;
+
 /**
  * Command to report the status of a list of services in roles.
  */
@@ -41,6 +41,8 @@ public class StatusCommand extends AgentCommand {
   private Map<String, Map<String, Map<String, String>>> configurationAttributes;
   private Map<String, String> commandParams = new HashMap<String, String>();
   private Map<String, String> hostLevelParams = new HashMap<String, String>();
+  private String hostname = null;
+  
 
   @JsonProperty("clusterName")
   public String getClusterName() {
@@ -111,5 +113,16 @@ public class StatusCommand extends AgentCommand {
   public void setCommandParams(Map<String, String> commandParams) {
     this.commandParams = commandParams;
   }
+  
+  @JsonProperty("hostname")
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
 
+  @JsonProperty("hostname")
+  public String getHostname() {
+    return hostname;
+  }
+  
+  
 }

+ 33 - 15
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py

@@ -63,29 +63,47 @@ class FlumeHandler(Script):
     json['processes'] = processes
     json['alerts'] = []
 
+    alert = {}
+    alert['name'] = 'flume_agent'
+    alert['label'] = 'Flume Agent process'
+
     if len(processes) == 0 and len(expected_agents) == 0:
-      alert = {}
-      alert['name'] = 'flume_agent'
-      alert['label'] = 'Flume Agent process'
       alert['state'] = 'WARNING'
-      alert['text'] = 'No agents defined'
-      json['alerts'].append(alert)
+
+      if not params.hostname is None:
+        alert['text'] = 'No agents defined on ' + params.hostname
+      else:
+        alert['text'] = 'No agents defined'
+
     else:
-      for proc in processes:
-        alert = {}
-        alert['name'] = 'flume_agent'
-        alert['instance'] = proc['name']
-        alert['label'] = 'Flume Agent process'
+      crit = []
+      ok = []
 
+      for proc in processes:
         if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING':
-          alert['state'] = 'CRITICAL'
-          alert['text'] = 'Flume agent {0} not running'.format(proc['name'])
+          crit.append(proc['name'])
         else:
-          alert['state'] = 'OK'
-          alert['text'] = 'Flume agent {0} is running'.format(proc['name'])
+          ok.append(proc['name'])
+
+      text_arr = []
+
+      if len(crit) > 0:
+        text_arr.append("{0} {1} NOT running".format(", ".join(crit),
+          "is" if len(crit) == 1 else "are"))
+
+      if len(ok) > 0:
+        text_arr.append("{0} {1} running".format(", ".join(ok),
+          "is" if len(ok) == 1 else "are"))
+
+      plural = len(crit) > 1 or len(ok) > 1
+      alert['text'] = "Agent{0} {1} {2}".format(
+        "s" if plural else "",
+        " and ".join(text_arr),
+        "" if params.hostname is None else "on " + str(params.hostname))
 
-        json['alerts'].append(alert)
+      alert['state'] = 'CRITICAL' if len(crit) > 0 else 'OK'
 
+    json['alerts'].append(alert)
     self.put_structured_out(json)
 
     # only throw an exception if there are agents defined and there is a 

+ 4 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/params.py

@@ -54,3 +54,7 @@ ganglia_server_hosts = default('/clusterHostInfo/ganglia_server_host', [])
 ganglia_server_host = None
 if 0 != len(ganglia_server_hosts):
   ganglia_server_host = ganglia_server_hosts[0]
+
+hostname = None
+if config.has_key('hostname'):
+  hostname = config['hostname']

+ 2 - 0
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java

@@ -295,6 +295,8 @@ public class TestHeartbeatMonitor {
     assertTrue("HeartbeatMonitor should generate StatusCommands for host2, " +
       "even if it has only client components", cmds.size() == 1);
     assertTrue(cmds.get(0).getComponentName().equals(Role.HDFS_CLIENT.name()));
+    assertEquals(hostname2, cmds.get(0).getHostname());
+
   }
 
   @Test

+ 124 - 1
ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py

@@ -98,7 +98,7 @@ class TestFlumeHandler(RMFTestCase):
     # test that the method was called with empty processes
     self.assertTrue(structured_out_mock.called)
     structured_out_mock.assert_called_with({'processes': [],
-      'alerts': [{'text': 'No agents defined', 'state': 'WARNING', 'name': 'flume_agent', 'label': 'Flume Agent process'}]})
+      'alerts': [{'text': 'No agents defined on c6401.ambari.apache.org', 'state': 'WARNING', 'name': 'flume_agent', 'label': 'Flume Agent process'}]})
     self.assertNoMoreResources()
 
   @patch("resource_management.libraries.script.Script.put_structured_out")
@@ -296,6 +296,129 @@ class TestFlumeHandler(RMFTestCase):
     self.assert_configure_default()
     self.assertNoMoreResources()
 
+  @patch("resource_management.libraries.script.Script.put_structured_out")
+  @patch("flume.find_expected_agent_names")
+  @patch("flume.flume_status")
+  def test_status_many_mixed(self, status_mock, expected_names_mock, structured_out_mock):
+    expected_names_mock.return_value = ["a1", "a2"]
+    status_mock.return_value = [{'name': 'a1', 'status': 'RUNNING'}, {'name': 'a2', 'status': 'NOT_RUNNING'}]
+
+    try:
+      self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "status",
+                       config_file="default.json")
+    except:
+      # expected since ComponentIsNotRunning gets raised
+      pass
+      
+    self.assertTrue(structured_out_mock.called)
+
+    # call_args[0] is a tuple, whose first element is the actual call argument
+    struct_out = structured_out_mock.call_args[0][0]
+    self.assertTrue(struct_out.has_key('processes'))
+    self.assertTrue(struct_out.has_key('alerts'))
+    self.assertTrue('Agent a2 is NOT running and a1 is running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text'])
+    self.assertTrue('CRITICAL' == struct_out['alerts'][0]['state'])
+    self.assertNoMoreResources()
+
+  @patch("resource_management.libraries.script.Script.put_structured_out")
+  @patch("flume.find_expected_agent_names")
+  @patch("flume.flume_status")
+  def test_status_many_ok(self, status_mock, expected_names_mock, structured_out_mock):
+    expected_names_mock.return_value = ["a1", "a2"]
+    status_mock.return_value = [{'name': 'a1', 'status': 'RUNNING'}, {'name': 'a2', 'status': 'RUNNING'}]
+
+    self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "status",
+                       config_file="default.json")
+      
+    self.assertTrue(structured_out_mock.called)
+
+    # call_args[0] is a tuple, whose first element is the actual call argument
+    struct_out = structured_out_mock.call_args[0][0]
+    self.assertTrue(struct_out.has_key('processes'))
+    self.assertTrue(struct_out.has_key('alerts'))
+    self.assertTrue('Agents a1, a2 are running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text'])
+    self.assertTrue('OK' == struct_out['alerts'][0]['state'])
+    self.assertNoMoreResources()
+
+  @patch("resource_management.libraries.script.Script.put_structured_out")
+  @patch("flume.find_expected_agent_names")
+  @patch("flume.flume_status")
+  def test_status_many_critical(self, status_mock, expected_names_mock, structured_out_mock):
+    expected_names_mock.return_value = ["a1", "a2"]
+    status_mock.return_value = [{'name': 'a1', 'status': 'NOT_RUNNING'}, {'name': 'a2', 'status': 'NOT_RUNNING'}]
+
+    try:
+      self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "status",
+                       config_file="default.json")
+    except:
+      # expected since ComponentIsNotRunning gets raised
+      pass
+      
+    self.assertTrue(structured_out_mock.called)
+
+    # call_args[0] is a tuple, whose first element is the actual call argument
+    struct_out = structured_out_mock.call_args[0][0]
+    self.assertTrue(struct_out.has_key('processes'))
+    self.assertTrue(struct_out.has_key('alerts'))
+    self.assertTrue('Agents a1, a2 are NOT running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text'])
+    self.assertTrue('CRITICAL' == struct_out['alerts'][0]['state'])
+    self.assertNoMoreResources()
+
+
+  @patch("resource_management.libraries.script.Script.put_structured_out")
+  @patch("flume.find_expected_agent_names")
+  @patch("flume.flume_status")
+  def test_status_single_ok(self, status_mock, expected_names_mock, structured_out_mock):
+    expected_names_mock.return_value = ["a1"]
+    status_mock.return_value = [{'name': 'a1', 'status': 'RUNNING'}]
+
+    self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "status",
+                       config_file="default.json")
+      
+    self.assertTrue(structured_out_mock.called)
+
+    # call_args[0] is a tuple, whose first element is the actual call argument
+    struct_out = structured_out_mock.call_args[0][0]
+    self.assertTrue(struct_out.has_key('processes'))
+    self.assertTrue(struct_out.has_key('alerts'))
+    self.assertTrue('Agent a1 is running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text'])
+    self.assertTrue('OK' == struct_out['alerts'][0]['state'])
+    self.assertNoMoreResources()
+
+  @patch("resource_management.libraries.script.Script.put_structured_out")
+  @patch("flume.find_expected_agent_names")
+  @patch("flume.flume_status")
+  def test_status_single_critical(self, status_mock, expected_names_mock, structured_out_mock):
+    expected_names_mock.return_value = ['a1']
+    status_mock.return_value = [{'name': 'a1', 'status': 'NOT_RUNNING'}]
+
+    try:
+      self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+                       classname = "FlumeHandler",
+                       command = "status",
+                       config_file="default.json")
+    except:
+      # expected since ComponentIsNotRunning gets raised
+      pass
+      
+    self.assertTrue(structured_out_mock.called)
+
+    # call_args[0] is a tuple, whose first element is the actual call argument
+    struct_out = structured_out_mock.call_args[0][0]
+    self.assertTrue(struct_out.has_key('processes'))
+    self.assertTrue(struct_out.has_key('alerts'))
+    self.assertTrue('Agent a1 is NOT running on c6401.ambari.apache.org' == struct_out['alerts'][0]['text'])
+    self.assertTrue('CRITICAL' == struct_out['alerts'][0]['state'])
+    self.assertNoMoreResources()
+
 def build_flume(content):
   result = {}
   agent_names = []