Pārlūkot izejas kodu

AMBARI-7282. Add metric- and script-based alert types (ncole)

Nate Cole 10 gadi atpakaļ
vecāks
revīzija
853497f8ed

+ 15 - 6
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -29,7 +29,9 @@ import time
 import traceback
 from apscheduler.scheduler import Scheduler
 from alerts.collector import AlertCollector
+from alerts.metric_alert import MetricAlert
 from alerts.port_alert import PortAlert
+from alerts.script_alert import ScriptAlert
 
 
 logger = logging.getLogger()
@@ -48,8 +50,9 @@ class AlertSchedulerHandler():
     'standalone': False
   }
 
-  def __init__(self, cachedir, in_minutes=True):
+  def __init__(self, cachedir, stacks_dir, in_minutes=True):
     self.cachedir = cachedir
+    self.stacks_dir = stacks_dir
     
     if not os.path.exists(cachedir) and AlertSchedulerHandler.make_cachedir:
       try:
@@ -133,6 +136,7 @@ class AlertSchedulerHandler():
 
       for definition in command_json['alertDefinitions']:
         obj = self.__json_to_callable(definition)
+        
         if obj is None:
           continue
           
@@ -163,11 +167,12 @@ class AlertSchedulerHandler():
     alert = None
 
     if source_type == AlertSchedulerHandler.TYPE_METRIC:
-      pass
+      alert = MetricAlert(json_definition, source)
     elif source_type == AlertSchedulerHandler.TYPE_PORT:
       alert = PortAlert(json_definition, source)
-    elif type == AlertSchedulerHandler.TYPE_SCRIPT:
-      pass
+    elif source_type == AlertSchedulerHandler.TYPE_SCRIPT:
+      source['stacks_dir'] = self.stacks_dir
+      alert = ScriptAlert(json_definition, source)
 
     return alert
     
@@ -210,11 +215,15 @@ def main():
   del args[0]
 
   try:
-    logger.setLevel(logger.debug)
+    logger.setLevel(logging.DEBUG)
   except TypeError:
     logger.setLevel(12)
     
-  ash = AlertSchedulerHandler(args[0], False)
+  ch = logging.StreamHandler()
+  ch.setLevel(logger.level)
+  logger.addHandler(ch)
+    
+  ash = AlertSchedulerHandler(args[0], args[1], False)
   ash.start()
   
   i = 0

+ 6 - 4
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -31,13 +31,14 @@ import pprint
 from random import randint
 
 import hostname
+import security
+import ssl
 import AmbariConfig
 from Heartbeat import Heartbeat
 from Register import Register
 from ActionQueue import ActionQueue
-import security
+from FileCache import FileCache
 from NetUtil import NetUtil
-import ssl
 from LiveStatus import LiveStatus
 from AlertSchedulerHandler import AlertSchedulerHandler
 
@@ -78,9 +79,10 @@ class Controller(threading.Thread):
     cache_dir = config.get('agent', 'cache_dir')
     if cache_dir is None:
       cache_dir = '/var/lib/ambari-agent/cache'
-      
+
+    stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
     alerts_cache_dir = os.path.join(cache_dir, 'alerts')
-    self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir)
+    self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir, stacks_cache_dir)
 
 
   def __del__(self):

+ 6 - 0
ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py

@@ -70,6 +70,9 @@ class BaseAlert(object):
       traceback.print_exc()
       res = (BaseAlert.RESULT_UNKNOWN, [str(e)])
       res_base_text = "Unknown {0}"
+    
+    if logger.isEnabledFor(logging.DEBUG):
+      logger.debug("debug alert result: {0}".format(str(res)))
       
     data = {}
     data['name'] = self._find_value('name')
@@ -80,6 +83,9 @@ class BaseAlert(object):
     data['service'] = self._find_value('serviceName')
     data['component'] = self._find_value('componentName')
     data['timestamp'] = long(time.time() * 1000)
+
+    if logger.isEnabledFor(logging.DEBUG):
+      logger.debug("debug alert text: {0}".format(data['text']))
     
     self.collector.put(self.cluster, data)
   

+ 174 - 0
ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py

@@ -0,0 +1,174 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import imp
+import json
+import logging
+import re
+import urllib2
+import uuid
+from alerts.base_alert import BaseAlert
+from resource_management.libraries.functions.get_port_from_url import get_port_from_url
+
+logger = logging.getLogger()
+
+class MetricAlert(BaseAlert):
+  
+  def __init__(self, alert_meta, alert_source_meta):
+    super(MetricAlert, self).__init__(alert_meta, alert_source_meta)
+
+    self.uri = self._find_lookup_property(alert_source_meta['uri'])
+    self.metric_info = None
+    
+    if 'jmx' in alert_source_meta:
+      self.metric_info = JmxMetric(alert_source_meta['jmx'])
+      
+  def _collect(self):
+    if self.metric_info is None:
+      raise Exception("Could not determine result.  Specific metric collector is not defined.")
+
+    uri = self._lookup_property_value(self.uri)
+    
+    host = BaseAlert.get_host_from_url(uri)
+    if host is None:
+      host = self.host_name
+
+    port = 80 # probably not very realistic
+    try:      
+      port = int(get_port_from_url(uri))
+    except:
+      pass
+
+    collect_result = None
+    check_value = None
+    value_list = []
+
+    if isinstance(self.metric_info, JmxMetric):
+      value_list.extend(self._load_jmx(False, host, port, self.metric_info))
+      check_value = self.metric_info.calculate(value_list)
+      value_list.append(check_value)
+      
+      collect_result = self.__get_result(value_list[0] if check_value is None else check_value)
+
+    logger.debug("Resolved value list is: {0}".format(str(value_list)))
+    
+    return ((collect_result, value_list))
+  
+  def __get_result(self, value):
+    ok_value = self.__find_threshold('ok')
+    warn_value = self.__find_threshold('warning')
+    crit_value = self.__find_threshold('critical')
+    
+    # critical values are higher
+    crit_direction_up = crit_value > warn_value
+    
+    if crit_direction_up: 
+      # critcal values are higher
+      if value > crit_value:
+        return self.RESULT_CRITICAL
+      elif value > warn_value:
+        return self.RESULT_WARNING
+      else:
+        if ok_value is not None:
+          if value > ok_value and value <= warn_value:
+            return self.RESULT_OK
+          else:
+            return self.RESULT_UNKNOWN
+        else:
+          return self.RESULT_OK
+    else:
+      # critical values are lower
+      if value < crit_value:
+        return self.RESULT_CRITICAL
+      elif value < warn_value:
+        return self.RESULT_WARNING
+      else:
+        if ok_value is not None:
+          if value < ok_value and value >= warn_value:
+            return self.RESULT_OK
+          else:
+            return self.RESULT_UNKNOWN
+        else:
+          return self.RESULT_OK
+
+    return None
+    
+  def __find_threshold(self, reporting_type):
+    ''' find the defined thresholds for alert values '''
+    
+    if not 'reporting' in self.alert_source_meta:
+      return None
+      
+    if not reporting_type in self.alert_source_meta['reporting']:
+      return None
+      
+    if not 'value' in self.alert_source_meta['reporting'][reporting_type]:
+      return None
+      
+    return self.alert_source_meta['reporting'][reporting_type]['value']
+    
+  def _load_jmx(self, ssl, host, port, jmx_metric):
+    ''' creates a JmxMetric object that holds info about jmx-based metrics '''
+    
+    logger.debug(str(jmx_metric.property_map))
+    
+    value_list = []
+
+    for k, v in jmx_metric.property_map.iteritems():
+      url = "{0}://{1}:{2}/jmx?qry={3}".format(
+        "https" if ssl else "http", host, str(port), k)
+        
+      response = urllib2.urlopen(url)
+      json_response = json.loads(response.read())
+      json_data = json_response['beans'][0]
+      
+      for attr in v:
+        value_list.append(json_data[attr])
+        
+    return value_list
+    
+class JmxMetric:
+  def __init__(self, jmx_info):
+    self.custom_module = None
+    self.property_list = jmx_info['property_list']
+    self.property_map = {}
+    
+    if 'value' in jmx_info:
+      realcode = re.sub('(\{(\d+)\})', 'args[\g<2>]', jmx_info['value'])
+      
+      self.custom_module =  imp.new_module(str(uuid.uuid4()))
+      code = 'def f(args):\n  return ' + realcode
+      exec code in self.custom_module.__dict__
+    
+    for p in self.property_list:
+      parts = p.split('/')
+      if not parts[0] in self.property_map:
+        self.property_map[parts[0]] = []
+      self.property_map[parts[0]].append(parts[1])
+      
+  def calculate(self, args):
+    if self.custom_module is not None:
+      return self.custom_module.f(args)
+    return None
+    
+      
+    
+  
+    

+ 70 - 0
ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py

@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import imp
+import logging
+import os
+from alerts.base_alert import BaseAlert
+
+logger = logging.getLogger()
+
+class ScriptAlert(BaseAlert):
+  def __init__(self, alert_meta, alert_source_meta):
+    ''' ScriptAlert reporting structure is output from the script itself '''
+    
+    alert_source_meta['reporting'] = {
+      'ok': { 'text': '{0}' },
+      'warning': { 'text': '{0}' },
+      'critical': { 'text': '{0}' },
+      'unknown': { 'text': '{0}' }
+    }
+    
+    super(ScriptAlert, self).__init__(alert_meta, alert_source_meta)
+    
+    self.path = None
+    if 'path' in alert_source_meta:
+      self.path = alert_source_meta['path']
+      
+    if 'stacks_dir' in alert_source_meta:
+      self.stacks_dir = alert_source_meta['stacks_dir']
+    
+  def _collect(self):
+    if self.path is None and self.stack_path is None:
+      raise Exception("The attribute 'path' must be specified")
+
+    path_to_script = self.path
+    if not os.path.exists(self.path) and self.stacks_dir is not None:
+      paths = self.path.split('/')
+      path_to_script = os.path.join(self.stacks_dir, *paths)
+      
+    if not os.path.exists(path_to_script) or not os.path.isfile(path_to_script):
+      raise Exception(
+        "Resolved script '{0}' does not appear to be a script".format(
+          path_to_script))
+
+    if logger.isEnabledFor(logging.DEBUG):
+      logger.debug("Executing script check {0}".format(path_to_script))
+
+          
+    if (path_to_script.endswith('.py')):
+      cmd_module = imp.load_source(self._find_value('name'), path_to_script)
+      return cmd_module.execute()
+    else:
+      return ((self.RESULT_UNKNOWN, ["could not execute script {0}".format(path_to_script)]))

+ 96 - 6
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -21,9 +21,11 @@ limitations under the License.
 import os
 import sys
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
-from ambari_agent.apscheduler.scheduler import Scheduler
-from ambari_agent.alerts.port_alert import PortAlert
 from ambari_agent.alerts.collector import AlertCollector
+from ambari_agent.alerts.metric_alert import MetricAlert
+from ambari_agent.alerts.port_alert import PortAlert
+from ambari_agent.alerts.script_alert import ScriptAlert
+from ambari_agent.apscheduler.scheduler import Scheduler
 from mock.mock import patch
 from unittest import TestCase
 
@@ -39,8 +41,9 @@ class TestAlerts(TestCase):
   @patch.object(Scheduler, "start")
   def test_start(self, aps_add_interval_job_mock, aps_start_mock):
     test_file_path = os.path.join('ambari_agent', 'dummy_files')
+    test_stack_path = os.path.join('ambari_agent', 'dummy_files')
 
-    ash = AlertSchedulerHandler(test_file_path)
+    ash = AlertSchedulerHandler(test_file_path, test_stack_path)
     ash.start()
 
     self.assertTrue(aps_add_interval_job_mock.called)
@@ -75,8 +78,6 @@ class TestAlerts(TestCase):
     self.assertEquals(6, pa.interval())
 
     res = pa.collect()
-    
-    pass
 
   def test_port_alert_no_sub(self):
     json = { "name": "namenode_process",
@@ -105,5 +106,94 @@ class TestAlerts(TestCase):
     self.assertEquals('http://c6401.ambari.apache.org', pa.uri)
 
     res = pa.collect()
+
+  def test_script_alert(self):
+    json = {
+      "name": "namenode_process",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "source": {
+        "type": "SCRIPT",
+        "path": "test_script.py",
+        "reporting": {
+          "ok": {
+            "text": "TCP OK - {0:.4f} response time on port {1}"
+          },
+          "critical": {
+            "text": "Could not load process info: {0}"
+          }
+        }
+      }
+    }
+
+    # normally set by AlertSchedulerHandler
+    json['source']['stacks_dir'] = os.path.join('ambari_agent', 'dummy_files')
+
+    collector = AlertCollector()
+    sa = ScriptAlert(json, json['source'])
+    sa.set_helpers(collector, '')
+    self.assertEquals(json['source']['path'], sa.path)
+    self.assertEquals(json['source']['stacks_dir'], sa.stacks_dir)
+
+    sa.collect()
+
+    self.assertEquals('WARNING', collector.alerts()[0]['state'])
+    self.assertEquals('all is not well', collector.alerts()[0]['text'])
+   
+  @patch.object(MetricAlert, "_load_jmx")
+  def test_metric_alert(self, ma_load_jmx_mock):
+    json = {
+      "name": "cpu_check",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "source": {
+        "type": "METRIC",
+        "uri": "http://myurl:8633",
+        "jmx": {
+          "property_list": [
+            "someJmxObject/value",
+            "someOtherJmxObject/value"
+          ],
+          "value": "{0} * 100 + 123"
+        },
+        "reporting": {
+          "ok": {
+            "text": "ok_arr: {0} {1} {2}",
+          },
+          "warning": {
+            "text": "",
+            "value": 13
+          },
+          "critical": {
+            "text": "crit_arr: {0} {1} {2}",
+            "value": 72
+          }
+        }
+      }
+    }
+
+    ma_load_jmx_mock.return_value = [1, 3]
+
+    collector = AlertCollector()
+    ma = MetricAlert(json, json['source'])
+    ma.set_helpers(collector, '')
+    ma.collect()
+
+    self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
+    self.assertEquals('crit_arr: 1 3 223', collector.alerts()[0]['text'])
+
+    del json['source']['jmx']['value']
+    collector = AlertCollector()
+    ma = MetricAlert(json, json['source'])
+    ma.set_helpers(collector, '')
+    ma.collect()
+
+    self.assertEquals('OK', collector.alerts()[0]['state'])
+    self.assertEquals('ok_arr: 1 3 None', collector.alerts()[0]['text'])
     
-    pass

+ 0 - 21
ambari-agent/src/test/python/ambari_agent/dummy_files/definitions.json

@@ -9,16 +9,6 @@
       }
     },
     "alertDefinitions": [
-      {
-        "name": "namenode_cpu",
-        "label": "NameNode host CPU Utilization",
-        "scope": "host",
-        "source": {
-          "type": "METRIC",
-          "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
-          "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}"
-        }
-      },
       {
         "name": "namenode_process",
         "service": "HDFS",
@@ -39,17 +29,6 @@
             }
           }
         }
-      },
-      {
-        "name": "hdfs_last_checkpoint",
-        "label": "Last Checkpoint Time",
-        "interval": 1,
-        "scope": "service",
-        "enabled": false,
-        "source": {
-          "type": "SCRIPT",
-          "path": "scripts/alerts/last_checkpoint.py"
-        }
       }
     ]
   }

+ 3 - 0
ambari-agent/src/test/python/ambari_agent/dummy_files/test_script.py

@@ -0,0 +1,3 @@
+
+def execute(params=None):
+  return (('WARNING', ['all is not well', str(params)]))

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertResourceProvider.java

@@ -167,12 +167,12 @@ public class AlertResourceProvider extends ReadOnlyResourceProvider {
     setResourceProperty(resource, ALERT_LATEST_TIMESTAMP, entity.getLatestTimestamp(), requestedIds);
     setResourceProperty(resource, ALERT_MAINTENANCE_STATE, entity.getMaintenanceState(), requestedIds);
     setResourceProperty(resource, ALERT_ORIGINAL_TIMESTAMP, entity.getOriginalTimestamp(), requestedIds);
+    setResourceProperty(resource, ALERT_TEXT, entity.getLatestText(), requestedIds);
     
     AlertHistoryEntity history = entity.getAlertHistory();
     setResourceProperty(resource, ALERT_INSTANCE, history.getAlertInstance(), requestedIds);
     setResourceProperty(resource, ALERT_LABEL, history.getAlertLabel(), requestedIds);
     setResourceProperty(resource, ALERT_STATE, history.getAlertState(), requestedIds);
-    setResourceProperty(resource, ALERT_TEXT, history.getAlertText(), requestedIds);
     setResourceProperty(resource, ALERT_COMPONENT, history.getComponentName(), requestedIds);
     setResourceProperty(resource, ALERT_HOST, history.getHostName(), requestedIds);
     setResourceProperty(resource, ALERT_SERVICE, history.getServiceName(), requestedIds);

+ 4 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertSummaryPropertyProvider.java

@@ -90,6 +90,10 @@ public class AlertSummaryPropertyProvider extends BaseProvider implements Proper
     AlertSummaryDTO summary = null;
 
     String clusterName = (String) resource.getPropertyValue(m_clusterPropertyId);
+    
+    if (null == clusterName)
+      return;
+    
     String typeId = null == m_typeIdPropertyId ? null : (String) resource.getPropertyValue(m_typeIdPropertyId);
     Cluster cluster = s_clusters.getCluster(clusterName);
     

+ 12 - 11
ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java

@@ -27,10 +27,11 @@ import com.google.gson.annotations.SerializedName;
  */
 public class MetricSource extends Source {
 
-  private String host = null;
-
+  @SerializedName("uri")
+  private String m_uri = null;
+  
   @SerializedName("jmx")
-  private String jmxInfo = null;
+  private Object jmxInfo = null;
 
   @SerializedName("ganglia")
   private String gangliaInfo = null;
@@ -38,7 +39,7 @@ public class MetricSource extends Source {
   /**
    * @return the jmx info, if this metric is jmx-based
    */
-  public String getJmxInfo() {
+  public Object getJmxInfo() {
     return jmxInfo;
   }
 
@@ -50,10 +51,10 @@ public class MetricSource extends Source {
   }
 
   /**
-   * @return the host info, which may include port information
+   * @return the uri info, which may include port information
    */
-  public String getHost() {
-    return host;
+  public String getUri() {
+    return m_uri;
   }
 
   /**
@@ -65,7 +66,7 @@ public class MetricSource extends Source {
     int result = super.hashCode();
     result = prime * result
         + ((gangliaInfo == null) ? 0 : gangliaInfo.hashCode());
-    result = prime * result + ((host == null) ? 0 : host.hashCode());
+    result = prime * result + ((m_uri == null) ? 0 : m_uri.hashCode());
     result = prime * result + ((jmxInfo == null) ? 0 : jmxInfo.hashCode());
 
     return result;
@@ -97,11 +98,11 @@ public class MetricSource extends Source {
       return false;
     }
 
-    if (host == null) {
-      if (other.host != null) {
+    if (m_uri == null) {
+      if (other.m_uri != null) {
         return false;
       }
-    } else if (!host.equals(other.host)) {
+    } else if (!m_uri.equals(other.m_uri)) {
       return false;
     }
 

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/AlertDataManager.java

@@ -68,6 +68,7 @@ public class AlertDataManager {
       
     } else if (alert.getState() == current.getAlertHistory().getAlertState()) {
       current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setLatestText(alert.getText());
       
       m_alertsDao.merge(current);
     } else {

+ 31 - 1
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json

@@ -19,6 +19,36 @@
           }
         }        
       }
+    },
+    {
+      "name": "check_cpu",
+      "label": "NameNode host CPU utilization",
+      "interval": 2,
+      "scope": "any",
+      "source": {
+        "type": "METRIC",
+        "uri": "{{hdfs-site/dfs.namenode.http-address}}",
+        "reporting": {
+          "ok": {
+            "text": "{1} CPU, load {0:.1%}"
+          },
+          "warning": {
+            "text": "{1} CPU, load {0:.1%}",
+            "value": 200
+          },
+          "critical": {
+            "text": "{1} CPU, load {0:.1%}",
+            "value": 250
+          }
+        },
+        "jmx": {
+          "property_list": [
+            "java.lang:type=OperatingSystem/SystemCpuLoad",
+            "java.lang:type=OperatingSystem/AvailableProcessors"
+          ],
+          "value": "{0} * 100"
+        }
+      }
     }
   ],
   "SECONDARY_NAMENODE": [
@@ -84,4 +114,4 @@
       }
     }    
   ]
-}
+}