Forráskód Böngészése

AMBARI-6942. Alerts: save definitions and send alert data to the server (ncole)

Nate Cole 11 éve
szülő
commit
f588928175

+ 66 - 28
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -21,19 +21,30 @@ limitations under the License.
 '''
 http://apscheduler.readthedocs.org/en/v2.1.2
 '''
-from apscheduler.scheduler import Scheduler
-from alerts.port_alert import PortAlert
+import glob
 import json
 import logging
+import os
 import sys
 import time
+from apscheduler.scheduler import Scheduler
+from alerts.collector import AlertCollector
+from alerts.port_alert import PortAlert
+
 
 logger = logging.getLogger()
 
 class AlertSchedulerHandler():
+  make_cachedir = True
 
-  def __init__(self, filename, in_minutes=True):
-    self.filename = filename
+  def __init__(self, cachedir, in_minutes=True):
+    self.cachedir = cachedir
+    
+    if not os.path.exists(cachedir) and AlertSchedulerHandler.make_cachedir:
+      try:
+        os.makedirs(cachedir)
+      except:
+        pass
     
     config = {
       'threadpool.core_threads': 3,
@@ -41,41 +52,65 @@ class AlertSchedulerHandler():
       'standalone': False
     }
 
-    self.scheduler = Scheduler(config)
-
-    alert_callables = self.__load_alerts()
-
-    for _callable in alert_callables:
-      if in_minutes:
-        self.scheduler.add_interval_job(self.__make_function(_callable),
-          minutes=_callable.interval())
-      else:
-        self.scheduler.add_interval_job(self.__make_function(_callable),
-          seconds=_callable.interval())
+    self.__scheduler = Scheduler(config)
+    self.__in_minutes = in_minutes
+    self.__loaded = False
+    self.__collector = AlertCollector()
+          
+  def update_definitions(self, alert_commands, refresh_jobs=False):
+    for command in alert_commands:
+      with open(os.path.join(self.cachedir, command['clusterName'] + '.def'), 'w') as f:
+        json.dump(command, f, indent=2)
+    
+    if refresh_jobs:
+      self.__scheduler.shutdown(wait=False)
+      self.__loaded = False
+      self.start()
       
   def __make_function(self, alert_def):
     return lambda: alert_def.collect()
-
+    
   def start(self):
-    if not self.scheduler is None:
-      self.scheduler.start()
+    if not self.__loaded:
+      alert_callables = self.__load_definitions()
+      
+      for _callable in alert_callables:
+        if self.__in_minutes:
+          self.__scheduler.add_interval_job(self.__make_function(_callable),
+            minutes=_callable.interval())
+        else:
+          self.__scheduler.add_interval_job(self.__make_function(_callable),
+            seconds=_callable.interval())
+      self.__loaded = True
+      
+    if not self.__scheduler is None:
+      self.__scheduler.start()
     
   def stop(self):
-    if not self.scheduler is None:
-      self.scheduler.shutdown(wait=False)
-      self.scheduler = None
+    if not self.__scheduler is None:
+      self.__scheduler.shutdown(wait=False)
+      self.__scheduler = None
       
-  def __load_alerts(self):
+  def collector(self):
+    return self.__collector
+      
+  def __load_definitions(self):
     definitions = []
     try:
-      # FIXME make location configurable
-      with open(self.filename) as fp: 
-        cluster_defs = json.load(fp)
-        for deflist in cluster_defs.values():
-          for definition in deflist:
+      for deffile in glob.glob(os.path.join(self.cachedir, '*.def')):
+        with open(deffile, 'r') as f:
+          command_json = json.load(f)
+
+          for definition in command_json['alertDefinitions']:
             obj = self.__json_to_callable(definition)
+              
             if obj is not None:
+              obj.set_cluster(
+                '' if not 'clusterName' in command_json else command_json['clusterName'],
+                '' if not 'hostName' in command_json else command_json['hostName'])
+
               definitions.append(obj)
+      
     except:
       import traceback
       traceback.print_exc()
@@ -91,7 +126,7 @@ class AlertSchedulerHandler():
     if source_type == 'METRIC':
       pass
     elif source_type == 'PORT':
-      alert = PortAlert(json_definition, source)
+      alert = PortAlert(self.__collector, json_definition, source)
     elif type == 'SCRIPT':
       pass
 
@@ -119,6 +154,9 @@ def main():
       i += 1
   except KeyboardInterrupt:
     pass
+    
+  print str(ash.collector().alerts())
+      
   ash.stop()
     
 if __name__ == "__main__":

+ 13 - 3
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -79,8 +79,8 @@ class Controller(threading.Thread):
     if cache_dir is None:
       cache_dir = '/var/lib/ambari-agent/cache'
       
-    self.alert_scheduler_handler = AlertSchedulerHandler(
-     os.path.join(cache_dir, 'alerts', 'alert_definitions.json'))
+    alerts_cache_dir = os.path.join(cache_dir, 'alerts')
+    self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir)
 
 
   def __del__(self):
@@ -135,6 +135,12 @@ class Controller(threading.Thread):
           pass
         else:
           self.hasMappedComponents = False
+
+        if 'alertDefinitionCommands' in ret.keys():
+          logger.info("Got alert definition update on registration " + pprint.pformat(ret['alertDefinitionCommands']))
+          self.alert_scheduler_handler.update_definitions(ret['alertDefinitionCommands'])
+          pass
+
         pass
       except ssl.SSLError:
         self.repeatRegistration=False
@@ -247,6 +253,10 @@ class Controller(threading.Thread):
         if 'statusCommands' in response.keys():
           self.addToStatusQueue(response['statusCommands'])
           pass
+          
+        if 'alertDefinitionCommands' in response.keys():
+          self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
+          pass
 
         if "true" == response['restartAgent']:
           logger.error("Received the restartAgent command")
@@ -307,7 +317,7 @@ class Controller(threading.Thread):
     self.actionQueue = ActionQueue(self.config, controller=self)
     self.actionQueue.start()
     self.register = Register(self.config)
-    self.heartbeat = Heartbeat(self.actionQueue, self.config)
+    self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
 
     opener = urllib2.build_opener()
     urllib2.install_opener(opener)

+ 6 - 2
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -35,10 +35,11 @@ logger = logging.getLogger()
 firstContact = True
 class Heartbeat:
 
-  def __init__(self, actionQueue, config=None):
+  def __init__(self, actionQueue, config=None, alert_collector=None):
     self.actionQueue = actionQueue
     self.config = config
     self.reports = []
+    self.collector = alert_collector
 
   def build(self, id='-1', state_interval=-1, componentsMapped=False):
     global clusterId, clusterDefinitionRevision, firstContact
@@ -51,7 +52,6 @@ class Heartbeat:
     nodeStatus["alerts"] = []
 
 
-
     heartbeat = { 'responseId'        : int(id),
                   'timestamp'         : timestamp,
                   'hostname'          : hostname.hostname(self.config),
@@ -95,6 +95,10 @@ class Heartbeat:
         logger.debug("mounts: %s", str(mounts))
 
     nodeStatus["alerts"] = hostInfo.createAlerts(nodeStatus["alerts"])
+    
+    if self.collector is not None:
+      nodeStatus['alerts'].extend(self.collector.alerts())
+    
     return heartbeat
 
 def main(argv=None):

+ 11 - 3
ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py

@@ -28,15 +28,22 @@ class BaseAlert(object):
   RESULT_CRITICAL = 'CRITICAL'
   RESULT_UNKNOWN = 'UNKNOWN'
   
-  def __init__(self, alert_meta, alert_source_meta):
+  def __init__(self, collector, alert_meta, alert_source_meta):
+    self.collector = collector
     self.alert_meta = alert_meta
     self.alert_source_meta = alert_source_meta
+    self.cluster = ''
+    self.hostname = ''
     
   def interval(self):
     if not self.alert_meta.has_key('interval'):
       return 1
     else:
       return self.alert_meta['interval']
+      
+  def set_cluster(self, cluster, host):
+    self.cluster = cluster
+    self.hostname = host
   
   def collect(self):
     res = (BaseAlert.RESULT_UNKNOWN, [])
@@ -49,13 +56,14 @@ class BaseAlert(object):
       
     data = {}
     data['name'] = self._find_value('name')
+    data['label'] = self._find_value('label')
     data['state'] = res[0]
     data['text'] = res_base_text.format(*res[1])
-    # data['cluster'] = self._find_value('cluster') # not sure how to get this yet
+    data['cluster'] = self.cluster
     data['service'] = self._find_value('service')
     data['component'] = self._find_value('component')
     
-    print str(data)
+    self.collector.put(self.cluster, data)
   
   def _find_value(self, meta_key):
     if self.alert_meta.has_key(meta_key):

+ 47 - 0
ambari-agent/src/main/python/ambari_agent/alerts/collector.py

@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+
+logger = logging.getLogger()
+
+class AlertCollector():
+  '''
+  cluster -> name -> alert dict
+  '''  
+  def __init__(self):
+    self.__buckets = {}
+    
+  def put(self, cluster, alert):
+    if not cluster in self.__buckets:
+      self.__buckets[cluster] = {}
+      
+    self.__buckets[cluster][alert['name']] = alert 
+    
+  def alerts(self):
+    alerts = []
+    for clustermap in self.__buckets.values()[:]:
+      alerts.extend(clustermap.values())
+      
+    return alerts
+      
+    
+
+    

+ 3 - 3
ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py

@@ -30,8 +30,8 @@ logger = logging.getLogger()
 
 class PortAlert(BaseAlert):
 
-  def __init__(self, alert_meta, alert_source_meta):
-    super(PortAlert, self).__init__(alert_meta, alert_source_meta)
+  def __init__(self, collector, alert_meta, alert_source_meta):
+    super(PortAlert, self).__init__(collector, alert_meta, alert_source_meta)
     
     default_port = alert_source_meta['default_port']
     uri = alert_source_meta['uri']
@@ -42,7 +42,7 @@ class PortAlert(BaseAlert):
     try:
       self.port = int(get_port_from_url(uri))
     except:
-      traceback.print_exc()
+      # only when port parsing fails
       pass
 
     

+ 10 - 4
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -23,6 +23,7 @@ import sys
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.apscheduler.scheduler import Scheduler
 from ambari_agent.alerts.port_alert import PortAlert
+from ambari_agent.alerts.collector import AlertCollector
 from mock.mock import patch
 from unittest import TestCase
 
@@ -35,12 +36,15 @@ class TestAlerts(TestCase):
     sys.stdout == sys.__stdout__
 
   @patch.object(Scheduler, "add_interval_job")
-  def test_build(self, aps_add_interval_job_mock):
-    test_file_path = os.path.join('ambari_agent', 'dummy_files', 'alert_definitions.json')
+  @patch.object(Scheduler, "start")
+  def test_start(self, aps_add_interval_job_mock, aps_start_mock):
+    test_file_path = os.path.join('ambari_agent', 'dummy_files')
 
     ash = AlertSchedulerHandler(test_file_path)
+    ash.start()
 
     self.assertTrue(aps_add_interval_job_mock.called)
+    self.assertTrue(aps_start_mock.called)
 
   def test_port_alert(self):
     json = { "name": "namenode_process",
@@ -51,7 +55,7 @@ class TestAlerts(TestCase):
       "scope": "host",
       "source": {
         "type": "PORT",
-        "uri": "http://c6401.ambari.apache.org:50070",
+        "uri": "http://c6409.ambari.apache.org:50070",
         "default_port": 50070,
         "reporting": {
           "ok": {
@@ -64,7 +68,9 @@ class TestAlerts(TestCase):
       }
     }
 
-    pa = PortAlert(json, json['source'])
+    collector = AlertCollector()
+
+    pa = PortAlert(collector, json, json['source'])
     self.assertEquals(6, pa.interval())
 
     res = pa.collect()

+ 4 - 1
ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json → ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def

@@ -1,5 +1,8 @@
 {
-  "c1": [
+  "clusterName": "c1",
+  "hostName": "c6401.ambari.apache.org",
+  "hash": "12341234134412341243124",
+  "alertDefinitions": [
     {
       "name": "namenode_cpu",
       "label": "NameNode host CPU Utilization",