123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605 |
- #!/usr/bin/env python
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- from stacks.utils.RMFTestCase import *
- import os
- import socket
- import sys
- from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
- from ambari_agent.alerts.collector import AlertCollector
- from ambari_agent.alerts.metric_alert import MetricAlert
- from ambari_agent.alerts.port_alert import PortAlert
- from ambari_agent.alerts.script_alert import ScriptAlert
- from ambari_agent.alerts.web_alert import WebAlert
- from ambari_agent.apscheduler.scheduler import Scheduler
- from collections import namedtuple
- from mock.mock import patch
- from unittest import TestCase
- class TestAlerts(TestCase):
- def setUp(self):
- pass
- def tearDown(self):
- sys.stdout == sys.__stdout__
- @patch.object(Scheduler, "add_interval_job")
- @patch.object(Scheduler, "start")
- def test_start(self, aps_add_interval_job_mock, aps_start_mock):
- test_file_path = os.path.join('ambari_agent', 'dummy_files')
- test_stack_path = os.path.join('ambari_agent', 'dummy_files')
- test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
- ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_host_scripts_path)
- ash.start()
- self.assertTrue(aps_add_interval_job_mock.called)
- self.assertTrue(aps_start_mock.called)
- def test_port_alert(self):
- json = { "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "PORT",
- "uri": "{{hdfs-site/my-key}}",
- "default_port": 50070,
- "reporting": {
- "ok": {
- "text": "TCP OK - {0:.4f} response time on port {1}"
- },
- "critical": {
- "text": "Could not load process info: {0}"
- }
- }
- }
- }
- collector = AlertCollector()
- pa = PortAlert(json, json['source'])
- pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'})
- self.assertEquals(6, pa.interval())
- pa.collect()
- @patch.object(socket.socket,"connect")
- def test_port_alert_complex_uri(self, socket_connect_mock):
- json = { "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "PORT",
- "uri": "{{hdfs-site/my-key}}",
- "default_port": 50070,
- "reporting": {
- "ok": {
- "text": "TCP OK - {0:.4f} response time on port {1}"
- },
- "critical": {
- "text": "Could not load process info: {0}"
- }
- }
- }
- }
- collector = AlertCollector()
- pa = PortAlert(json, json['source'])
- # use a URI that has commas to verify that we properly parse it
- pa.set_helpers(collector, {'hdfs-site/my-key': 'c6401.ambari.apache.org:2181,c6402.ambari.apache.org:2181,c6403.ambari.apache.org:2181'})
- pa.host_name = 'c6402.ambari.apache.org'
- self.assertEquals(6, pa.interval())
- pa.collect()
- self.assertEquals('OK', collector.alerts()[0]['state'])
- self.assertTrue('response time on port 2181' in collector.alerts()[0]['text'])
- def test_port_alert_no_sub(self):
- json = { "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "PORT",
- "uri": "http://c6401.ambari.apache.org",
- "default_port": 50070,
- "reporting": {
- "ok": {
- "text": "TCP OK - {0:.4f} response time on port {1}"
- },
- "critical": {
- "text": "Could not load process info: {0}"
- }
- }
- }
- }
- pa = PortAlert(json, json['source'])
- pa.set_helpers(AlertCollector(), '')
- self.assertEquals('http://c6401.ambari.apache.org', pa.uri)
- pa.collect()
- def test_script_alert(self):
- json = {
- "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "SCRIPT",
- "path": "test_script.py",
- }
- }
- # normally set by AlertSchedulerHandler
- json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files')
- json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts')
- collector = AlertCollector()
- sa = ScriptAlert(json, json['source'])
- sa.set_helpers(collector, {'foo-site/bar': 'rendered-bar', 'foo-site/baz':'rendered-baz'} )
- self.assertEquals(json['source']['path'], sa.path)
- self.assertEquals(json['source']['stacks_directory'], sa.stacks_dir)
- self.assertEquals(json['source']['host_scripts_directory'], sa.host_scripts_dir)
- sa.collect()
- self.assertEquals('WARNING', collector.alerts()[0]['state'])
- self.assertEquals('bar is rendered-bar, baz is rendered-baz', collector.alerts()[0]['text'])
- @patch.object(MetricAlert, "_load_jmx")
- def test_metric_alert(self, ma_load_jmx_mock):
- json = {
- "name": "cpu_check",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "METRIC",
- "uri": {
- "http": "{{hdfs-site/dfs.datanode.http.address}}"
- },
- "jmx": {
- "property_list": [
- "someJmxObject/value",
- "someOtherJmxObject/value"
- ],
- "value": "{0} * 100 + 123"
- },
- "reporting": {
- "ok": {
- "text": "ok_arr: {0} {1} {2}",
- },
- "warning": {
- "text": "",
- "value": 13
- },
- "critical": {
- "text": "crit_arr: {0} {1} {2}",
- "value": 72
- }
- }
- }
- }
- ma_load_jmx_mock.return_value = [1, 3]
- collector = AlertCollector()
- ma = MetricAlert(json, json['source'])
- ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
- ma.collect()
- self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
- self.assertEquals('crit_arr: 1 3 223', collector.alerts()[0]['text'])
- del json['source']['jmx']['value']
- collector = AlertCollector()
- ma = MetricAlert(json, json['source'])
- ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
- ma.collect()
- self.assertEquals('OK', collector.alerts()[0]['state'])
- self.assertEquals('ok_arr: 1 3 None', collector.alerts()[0]['text'])
- @patch.object(MetricAlert, "_load_jmx")
- def test_alert_uri_structure(self, ma_load_jmx_mock):
- json = {
- "name": "cpu_check",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "METRIC",
- "uri": {
- "http": "{{hdfs-site/dfs.datanode.http.address}}",
- "https": "{{hdfs-site/dfs.datanode.https.address}}",
- "https_property": "{{hdfs-site/dfs.http.policy}}",
- "https_property_value": "HTTPS_ONLY"
- },
- "jmx": {
- "property_list": [
- "someJmxObject/value",
- "someOtherJmxObject/value"
- ],
- "value": "{0}"
- },
- "reporting": {
- "ok": {
- "text": "ok_arr: {0} {1} {2}",
- },
- "warning": {
- "text": "",
- "value": 10
- },
- "critical": {
- "text": "crit_arr: {0} {1} {2}",
- "value": 20
- }
- }
- }
- }
- ma_load_jmx_mock.return_value = [1,1]
-
- # run the alert without specifying any keys; an exception should be thrown
- # indicating that there was no URI and the result is UNKNOWN
- collector = AlertCollector()
- ma = MetricAlert(json, json['source'])
- ma.set_helpers(collector, '')
- ma.collect()
- self.assertEquals('UNKNOWN', collector.alerts()[0]['state'])
- # set 2 properties that make no sense wihtout the main URI properties
- collector = AlertCollector()
- ma = MetricAlert(json, json['source'])
- ma.set_helpers(collector, {'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
- ma.collect()
-
- self.assertEquals('UNKNOWN', collector.alerts()[0]['state'])
-
- # set an actual property key (http)
- collector = AlertCollector()
- ma = MetricAlert(json, json['source'])
- ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80',
- 'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
- ma.collect()
-
- self.assertEquals('OK', collector.alerts()[0]['state'])
-
- # set an actual property key (https)
- collector = AlertCollector()
- ma = MetricAlert(json, json['source'])
- ma.set_helpers(collector, {'hdfs-site/dfs.datanode.https.address': '1.2.3.4:443',
- 'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
- ma.collect()
-
- self.assertEquals('OK', collector.alerts()[0]['state'])
- # set both (http and https)
- collector = AlertCollector()
- ma = MetricAlert(json, json['source'])
- ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80',
- 'hdfs-site/dfs.datanode.https.address': '1.2.3.4:443',
- 'hdfs-site/dfs.http.policy': 'HTTP_ONLY'})
- ma.collect()
-
- self.assertEquals('OK', collector.alerts()[0]['state'])
- @patch.object(WebAlert, "_make_web_request")
- def test_web_alert(self, wa_make_web_request_mock):
- json = {
- "name": "webalert_test",
- "service": "HDFS",
- "component": "DATANODE",
- "label": "WebAlert Test",
- "interval": 1,
- "scope": "HOST",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "WEB",
- "uri": {
- "http": "{{hdfs-site/dfs.datanode.http.address}}",
- "https": "{{hdfs-site/dfs.datanode.https.address}}",
- "https_property": "{{hdfs-site/dfs.http.policy}}",
- "https_property_value": "HTTPS_ONLY"
- },
- "reporting": {
- "ok": {
- "text": "ok: {0}",
- },
- "warning": {
- "text": "warning: {0}",
- },
- "critical": {
- "text": "critical: {1}",
- }
- }
- }
- }
- WebResponse = namedtuple('WebResponse', 'status_code time_millis')
- wa_make_web_request_mock.return_value = WebResponse(200,1.234)
- # run the alert and check HTTP 200
- collector = AlertCollector()
- alert = WebAlert(json, json['source'])
- alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
- alert.collect()
- self.assertEquals('OK', collector.alerts()[0]['state'])
- self.assertEquals('ok: 200', collector.alerts()[0]['text'])
- # run the alert and check HTTP 500
- wa_make_web_request_mock.return_value = WebResponse(500,1.234)
- collector = AlertCollector()
- alert = WebAlert(json, json['source'])
- alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
- alert.collect()
-
- self.assertEquals('WARNING', collector.alerts()[0]['state'])
- self.assertEquals('warning: 500', collector.alerts()[0]['text'])
- # run the alert and check critical
- wa_make_web_request_mock.return_value = WebResponse(0,0)
-
- collector = AlertCollector()
- alert = WebAlert(json, json['source'])
- alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'})
- alert.collect()
-
- # http assertion indicating that we properly determined non-SSL
- self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
- self.assertEquals('critical: http://1.2.3.4:80', collector.alerts()[0]['text'])
-
- collector = AlertCollector()
- alert = WebAlert(json, json['source'])
- alert.set_helpers(collector, {
- 'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80',
- 'hdfs-site/dfs.datanode.https.address': '1.2.3.4:8443',
- 'hdfs-site/dfs.http.policy': 'HTTPS_ONLY'})
- alert.collect()
-
- # SSL assertion
- self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
- self.assertEquals('critical: https://1.2.3.4:8443', collector.alerts()[0]['text'])
- def test_reschedule(self):
- test_file_path = os.path.join('ambari_agent', 'dummy_files')
- test_stack_path = os.path.join('ambari_agent', 'dummy_files')
- test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
-
- ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_host_scripts_path)
- ash.start()
- self.assertEquals(1, ash.get_job_count())
- ash.reschedule()
- self.assertEquals(1, ash.get_job_count())
- def test_alert_collector_purge(self):
- json = { "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "PORT",
- "uri": "{{hdfs-site/my-key}}",
- "default_port": 50070,
- "reporting": {
- "ok": {
- "text": "TCP OK - {0:.4f} response time on port {1}"
- },
- "critical": {
- "text": "Could not load process info: {0}"
- }
- }
- }
- }
- collector = AlertCollector()
- pa = PortAlert(json, json['source'])
- pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'})
- self.assertEquals(6, pa.interval())
- res = pa.collect()
- self.assertTrue(collector.alerts()[0] is not None)
- self.assertEquals('CRITICAL', collector.alerts()[0]['state'])
- collector.remove_by_uuid('c1f73191-4481-4435-8dae-fd380e4c0be1')
- self.assertEquals(0,len(collector.alerts()))
- def test_disabled_definitions(self):
- test_file_path = os.path.join('ambari_agent', 'dummy_files')
- test_stack_path = os.path.join('ambari_agent', 'dummy_files')
- test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
- ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_host_scripts_path)
- ash.start()
- self.assertEquals(1, ash.get_job_count())
- json = { "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "PORT",
- "uri": "{{hdfs-site/my-key}}",
- "default_port": 50070,
- "reporting": {
- "ok": {
- "text": "TCP OK - {0:.4f} response time on port {1}"
- },
- "critical": {
- "text": "Could not load process info: {0}"
- }
- }
- }
- }
- pa = PortAlert(json, json['source'])
- ash.schedule_definition(pa)
- self.assertEquals(2, ash.get_job_count())
- json['enabled'] = False
- pa = PortAlert(json, json['source'])
- ash.schedule_definition(pa)
- # verify disabled alert not scheduled
- self.assertEquals(2, ash.get_job_count())
- json['enabled'] = True
- pa = PortAlert(json, json['source'])
- ash.schedule_definition(pa)
- # verify enabled alert was scheduled
- self.assertEquals(3, ash.get_job_count())
- def test_immediate_alert(self):
- test_file_path = os.path.join('ambari_agent', 'dummy_files')
- test_stack_path = os.path.join('ambari_agent', 'dummy_files')
- test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files')
- ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_host_scripts_path)
- ash.start()
- self.assertEquals(1, ash.get_job_count())
- self.assertEquals(0, len(ash._collector.alerts()))
- execution_commands = [ {
- "clusterName": "c1",
- "hostName": "c6401.ambari.apache.org",
- "alertDefinition": {
- "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "PORT",
- "uri": "{{hdfs-site/my-key}}",
- "default_port": 50070,
- "reporting": {
- "ok": {
- "text": "TCP OK - {0:.4f} response time on port {1}"
- },
- "critical": {
- "text": "Could not load process info: {0}"
- }
- }
- }
- }
- } ]
- # execute the alert immediately and verify that the collector has the result
- ash.execute_alert(execution_commands)
- self.assertEquals(1, len(ash._collector.alerts()))
- def test_skipped_alert(self):
- json = {
- "name": "namenode_process",
- "service": "HDFS",
- "component": "NAMENODE",
- "label": "NameNode process",
- "interval": 6,
- "scope": "host",
- "enabled": True,
- "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1",
- "source": {
- "type": "SCRIPT",
- "path": "test_script.py",
- }
- }
- # normally set by AlertSchedulerHandler
- json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files')
- json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts')
- collector = AlertCollector()
- sa = ScriptAlert(json, json['source'])
- # instruct the test alert script to be skipped
- sa.set_helpers(collector, {'foo-site/skip': 'true'} )
- self.assertEquals(json['source']['path'], sa.path)
- self.assertEquals(json['source']['stacks_directory'], sa.stacks_dir)
- self.assertEquals(json['source']['host_scripts_directory'], sa.host_scripts_dir)
- # ensure that it was skipped
- self.assertEquals(0,len(collector.alerts()))
|