Переглянути джерело

AMBARI-13954 Enable auto-start with alerting for AMS (dsen)

Dmytro Sen 9 роки тому
батько
коміт
d48a9b6851

+ 7 - 1
ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py

@@ -33,6 +33,7 @@ from alerts.metric_alert import MetricAlert
 from alerts.port_alert import PortAlert
 from alerts.script_alert import ScriptAlert
 from alerts.web_alert import WebAlert
+from alerts.recovery_alert import RecoveryAlert
 from ambari_agent.ExitHelper import ExitHelper
 logger = logging.getLogger(__name__)
 
@@ -42,9 +43,11 @@ class AlertSchedulerHandler():
   TYPE_METRIC = 'METRIC'
   TYPE_SCRIPT = 'SCRIPT'
   TYPE_WEB = 'WEB'
+  TYPE_RECOVERY = 'RECOVERY'
 
   def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir,
-      alert_grace_period, cluster_configuration, config, in_minutes=True):
+      alert_grace_period, cluster_configuration, config, recovery_manager,
+      in_minutes=True):
 
     self.cachedir = cachedir
     self.stacks_dir = stacks_dir
@@ -70,6 +73,7 @@ class AlertSchedulerHandler():
     self.__scheduler = Scheduler(self.APS_CONFIG)
     self.__in_minutes = in_minutes
     self.config = config
+    self.recovery_manger = recovery_manager
 
     # register python exit handler
     ExitHelper().register(self.exit_handler)
@@ -282,6 +286,8 @@ class AlertSchedulerHandler():
         alert = ScriptAlert(json_definition, source, self.config)
       elif source_type == AlertSchedulerHandler.TYPE_WEB:
         alert = WebAlert(json_definition, source, self.config)
+      elif source_type == AlertSchedulerHandler.TYPE_RECOVERY:
+        alert = RecoveryAlert(json_definition, source, self.recovery_manger)
 
       if alert is not None:
         alert.set_cluster(clusterName, hostName)

+ 7 - 4
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -84,7 +84,6 @@ class Controller(threading.Thread):
     self.heartbeat_stop_callback = heartbeat_stop_callback
     # List of callbacks that are called at agent registration
     self.registration_listeners = []
-    self.recovery_manager = RecoveryManager()
 
     # pull config directory out of config
     cache_dir = config.get('agent', 'cache_dir')
@@ -94,8 +93,11 @@ class Controller(threading.Thread):
     stacks_cache_dir = os.path.join(cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
     common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
     host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
-    alerts_cache_dir = os.path.join(cache_dir, 'alerts')
-    cluster_config_cache_dir = os.path.join(cache_dir, 'cluster_configuration')
+    alerts_cache_dir = os.path.join(cache_dir, FileCache.ALERTS_CACHE_DIRECTORY)
+    cluster_config_cache_dir = os.path.join(cache_dir, FileCache.CLUSTER_CONFIGURATION_CACHE_DIRECTORY)
+    recovery_cache_dir = os.path.join(cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
+
+    self.recovery_manager = RecoveryManager(recovery_cache_dir)
 
     self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir)
 
@@ -105,7 +107,8 @@ class Controller(threading.Thread):
 
     self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir, 
       stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir,
-      self.alert_grace_period, self.cluster_configuration, config)
+      self.alert_grace_period, self.cluster_configuration, config,
+      self.recovery_manager)
 
     self.alert_scheduler_handler.start()
 

+ 3 - 0
ambari-agent/src/main/python/ambari_agent/FileCache.py

@@ -39,6 +39,9 @@ class FileCache():
   downloads relevant files from the server.
   """
 
+  CLUSTER_CONFIGURATION_CACHE_DIRECTORY="cluster_configuration"
+  ALERTS_CACHE_DIRECTORY="alerts"
+  RECOVERY_CACHE_DIRECTORY="recovery"
   STACKS_CACHE_DIRECTORY="stacks"
   COMMON_SERVICES_DIRECTORY="common-services"
   CUSTOM_ACTIONS_CACHE_DIRECTORY="custom_actions"

+ 68 - 8
ambari-agent/src/main/python/ambari_agent/RecoveryManager.py

@@ -15,9 +15,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+import json
 import logging
 import copy
+import os
 import time
 import threading
 import pprint
@@ -56,6 +57,8 @@ class RecoveryManager:
   COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
   COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
 
+  FILENAME = "recovery.json"
+
   default_action_counter = {
     "lastAttempt": 0,
     "count": 0,
@@ -72,8 +75,7 @@ class RecoveryManager:
     "stale_config": False
   }
 
-
-  def __init__(self, recovery_enabled=False, auto_start_only=False):
+  def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False):
     self.recovery_enabled = recovery_enabled
     self.auto_start_only = auto_start_only
     self.max_count = 6
@@ -87,14 +89,24 @@ class RecoveryManager:
     self.allowed_current_states = [self.INIT, self.INSTALLED]
     self.enabled_components = []
     self.disabled_components = []
-    self.actions = {}
     self.statuses = {}
     self.__status_lock = threading.RLock()
     self.__command_lock = threading.RLock()
     self.__active_command_lock = threading.RLock()
+    self.__cache_lock = threading.RLock()
     self.active_command_count = 0
     self.paused = False
 
+    if not os.path.exists(cache_dir):
+      try:
+        os.makedirs(cache_dir)
+      except:
+        logger.critical("[RecoveryManager] Could not create the cache directory {0}".format(cache_dir))
+
+    self.__actions_json_file = os.path.join(cache_dir, self.FILENAME)
+
+    self.actions = self._load_actions()
+
     self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, "", "")
 
     pass
@@ -366,6 +378,7 @@ class RecoveryManager:
     """
     action_counter = self.actions[action_name]
     now = self._now_()
+    executed = False
     seconds_since_last_attempt = now - action_counter["lastAttempt"]
     if action_counter["lifetimeCount"] < self.max_lifetime_count:
       if action_counter["count"] < self.max_count:
@@ -377,7 +390,7 @@ class RecoveryManager:
           action_counter["warnedLastAttempt"] = False
           if action_counter["count"] == 1:
             action_counter["lastReset"] = now
-          return True
+          executed = True
         else:
           if action_counter["warnedLastAttempt"] == False:
             action_counter["warnedLastAttempt"] = True
@@ -398,7 +411,7 @@ class RecoveryManager:
             action_counter["lastAttempt"] = now
           action_counter["lastReset"] = now
           action_counter["warnedLastReset"] = False
-          return True
+          executed = True
         else:
           if action_counter["warnedLastReset"] == False:
             action_counter["warnedLastReset"] = True
@@ -417,7 +430,54 @@ class RecoveryManager:
       else:
         logger.debug("%s occurrences in agent life time reached the limit for %s",
                      action_counter["lifetimeCount"], action_name)
-    return False
+    self._dump_actions()
+    return executed
+    pass
+
+
+  def _dump_actions(self):
+    """
+    Dump recovery actions to FS
+    """
+    self.__cache_lock.acquire()
+    try:
+      with open(self.__actions_json_file, 'w') as f:
+        json.dump(self.actions, f, indent=2)
+    except Exception, exception:
+      logger.exception("Unable to dump actions to {0}".format(self.__actions_json_file))
+      return False
+    finally:
+      self.__cache_lock.release()
+
+    return True
+    pass
+
+
+  def _load_actions(self):
+    """
+    Loads recovery actions from FS
+    """
+    self.__cache_lock.acquire()
+
+    try:
+      if os.path.isfile(self.__actions_json_file):
+        with open(self.__actions_json_file, 'r') as fp:
+          return json.load(fp)
+    except Exception, exception:
+      logger.warning("Unable to load recovery actions from {0}.".format(self.__actions_json_file))
+    finally:
+      self.__cache_lock.release()
+
+    return {}
+    pass
+
+
+  def get_actions_copy(self):
+    """
+    Loads recovery actions from FS
+    :return:  recovery actions copy
+    """
+    return self._load_actions()
     pass
 
 
@@ -775,7 +835,7 @@ class RecoveryManager:
 
 
 def main(argv=None):
-  cmd_mgr = RecoveryManager()
+  cmd_mgr = RecoveryManager('/tmp')
   pass
 
 

+ 103 - 0
ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py

@@ -0,0 +1,103 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import logging
+import datetime
+from alerts.base_alert import BaseAlert
+logger = logging.getLogger()
+
+# default recoveries counts
+DEFAULT_WARNING_RECOVERIES_COUNT = 1
+DEFAULT_CRITICAL_RECOVERIES_COUNT = 5
+
+UNKNOWN_COMPONENT = 'UNKNOWN_COMPONENT'
+class RecoveryAlert(BaseAlert):
+
+  def __init__(self, alert_meta, alert_source_meta, recovery_manager):
+    super(RecoveryAlert, self).__init__(alert_meta, alert_source_meta)
+
+    self.recovery_manager = recovery_manager
+    self.warning_count = DEFAULT_WARNING_RECOVERIES_COUNT
+    self.critical_count = DEFAULT_CRITICAL_RECOVERIES_COUNT
+
+    if 'reporting' in alert_source_meta:
+      reporting = alert_source_meta['reporting']
+      reporting_state_warning = self.RESULT_WARNING.lower()
+      reporting_state_critical = self.RESULT_CRITICAL.lower()
+
+      if reporting_state_warning in reporting and \
+          'count' in reporting[reporting_state_warning]:
+        self.warning_count = reporting[reporting_state_warning]['count']
+
+      if reporting_state_critical in reporting and \
+          'count' in reporting[reporting_state_critical]:
+        self.critical_count = reporting[reporting_state_critical]['count']
+    if self.critical_count <= self.warning_count:
+      if logger.isEnabledFor(logging.DEBUG):
+        logger.debug("[Alert][{0}] The CRITICAL value of {1} must be greater than the WARNING value of {2}".format(
+          self.get_name(), self.critical_count, self.warning_count))
+
+  def _collect(self):
+
+    component = UNKNOWN_COMPONENT
+    if 'componentName' in self.alert_meta:
+      component = self.alert_meta['componentName']
+
+    if logger.isEnabledFor(logging.DEBUG):
+      logger.debug("[Alert][{0}] Checking recovery operations for {1}".format(
+        self.get_name(), component))
+
+    recovery_action_info = {}
+    recovery_actions = self.recovery_manager.get_actions_copy()
+    if component in recovery_actions:
+      recovery_action_info = recovery_actions[component]
+
+    recovered_times = 0
+    if 'count' in recovery_action_info:
+      recovered_times = recovery_action_info['count']
+    lastResetText = ""
+    if 'lastReset' in recovery_action_info:
+      lastResetText = " since " + str(datetime.datetime.fromtimestamp(recovery_action_info['lastReset']))
+    warned_threshold_reached = False
+    if 'warnedThresholdReached' in recovery_action_info:
+      warned_threshold_reached = recovery_action_info['warnedThresholdReached']
+
+    if recovered_times >= self.critical_count or warned_threshold_reached:
+      result = self.RESULT_CRITICAL
+    elif recovered_times >= self.warning_count:
+      result = self.RESULT_WARNING
+    elif recovered_times < self.warning_count and \
+        recovered_times < self.critical_count:
+      result = self.RESULT_OK
+    else:
+      result = self.RESULT_UNKNOWN
+
+    return (result, [lastResetText, recovered_times, component])
+
+  def _get_reporting_text(self, state):
+    '''
+    Gets the default reporting text to use when the alert definition does not
+    contain any.
+    :param state: the state of the alert in uppercase (such as OK, WARNING, etc)
+    :return:  the parametrized text
+    '''
+    if state == self.RESULT_OK:
+      return 'No recovery operations executed for {2}{0}.'
+    return '{1} recovery operations executed for {2}{0}.'

+ 3 - 3
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -337,7 +337,7 @@ class TestActionQueue(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
     dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, "", "")
 
     actionQueue = ActionQueue(config, dummy_controller)
@@ -665,7 +665,7 @@ class TestActionQueue(TestCase):
 
     build_mock.return_value = {'dummy report': '' }
 
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
 
     requestComponentStatus_mock.reset_mock()
     requestComponentStatus_mock.return_value = {'exitcode': 0 }
@@ -725,7 +725,7 @@ class TestActionQueue(TestCase):
                                 get_mock, process_command_mock, gpeo_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
     config = MagicMock()
     gpeo_mock.return_value = 0
     config.get_parallel_exec_option = gpeo_mock

+ 11 - 11
ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py

@@ -42,7 +42,7 @@ class TestAlertSchedulerHandler(TestCase):
     self.assertEquals(len(definitions), 1)
 
   def test_json_to_callable_metric(self):
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     json_definition = {
       'source': {
         'type': 'METRIC'
@@ -63,7 +63,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     }
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
 
     self.assertTrue(callable_result is not None)
@@ -79,7 +79,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     }
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
 
     self.assertTrue(callable_result is not None)
@@ -94,7 +94,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     }
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition))
 
     self.assertTrue(callable_result is None)
@@ -102,7 +102,7 @@ class TestAlertSchedulerHandler(TestCase):
   def test_execute_alert_noneScheduler(self):
     execution_commands = []
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     scheduler._AlertSchedulerHandler__scheduler = None
     alert_mock = Mock()
     scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
@@ -114,7 +114,7 @@ class TestAlertSchedulerHandler(TestCase):
   def test_execute_alert_noneCommands(self):
     execution_commands = None
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     alert_mock = Mock()
     scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
 
@@ -125,7 +125,7 @@ class TestAlertSchedulerHandler(TestCase):
   def test_execute_alert_emptyCommands(self):
     execution_commands = []
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     alert_mock = Mock()
     scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
 
@@ -144,7 +144,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     ]
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     alert_mock = MagicMock()
     alert_mock.collect = Mock()
     alert_mock.set_helpers = Mock()
@@ -159,7 +159,7 @@ class TestAlertSchedulerHandler(TestCase):
     self.assertTrue(alert_mock.collect.called)
 
   def test_load_definitions(self):
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     scheduler._AlertSchedulerHandler__config_maps = {
       'cluster': {}
     }
@@ -170,7 +170,7 @@ class TestAlertSchedulerHandler(TestCase):
     self.assertTrue(isinstance(alert_def, PortAlert))
 
   def test_load_definitions_noFile(self):
-    scheduler = AlertSchedulerHandler('wrong_path', 'wrong_path', 'wrong_path', 'wrong_path', 5, None, None)
+    scheduler = AlertSchedulerHandler('wrong_path', 'wrong_path', 'wrong_path', 'wrong_path', 5, None, None, None)
     scheduler._AlertSchedulerHandler__config_maps = {
       'cluster': {}
     }
@@ -190,7 +190,7 @@ class TestAlertSchedulerHandler(TestCase):
       }
     ]
 
-    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None)
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
     alert_mock = MagicMock()
     alert_mock.interval = Mock(return_value=5)
     alert_mock.collect = Mock()

+ 105 - 4
ambari-agent/src/test/python/ambari_agent/TestAlerts.py

@@ -22,14 +22,17 @@ import os
 import socket
 import sys
 import urllib2
+import tempfile
 
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
+from ambari_agent.RecoveryManager import RecoveryManager
 from ambari_agent.alerts.collector import AlertCollector
 from ambari_agent.alerts.base_alert import BaseAlert
 from ambari_agent.alerts.metric_alert import MetricAlert
 from ambari_agent.alerts.port_alert import PortAlert
 from ambari_agent.alerts.script_alert import ScriptAlert
 from ambari_agent.alerts.web_alert import WebAlert
+from ambari_agent.alerts.recovery_alert import RecoveryAlert
 from ambari_agent.apscheduler.scheduler import Scheduler
 from ambari_agent.ClusterConfiguration import ClusterConfiguration
 from ambari_commons.urllib_handlers import RefreshHeaderProcessor
@@ -60,7 +63,7 @@ class TestAlerts(TestCase):
 
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -109,6 +112,70 @@ class TestAlerts(TestCase):
     self.assertEquals(0, len(collector.alerts()))
     self.assertEquals('CRITICAL', alerts[0]['state'])
 
+  @patch.object(RecoveryManager, "get_actions_copy")
+  def test_recovery_alert(self, rm_get_actions_mock):
+    definition_json = self._get_recovery_alert_definition()
+    rm_get_actions_mock.return_value = {
+        "METRICS_COLLECTOR": {
+          "count": 0,
+          "lastAttempt": 1447860184,
+          "warnedLastReset": False,
+          "lastReset": 1447860184,
+          "warnedThresholdReached": False,
+          "lifetimeCount": 1,
+          "warnedLastAttempt": False
+        }
+      }
+
+    collector = AlertCollector()
+    cluster_configuration = self.__get_cluster_configuration()
+    self.__update_cluster_configuration(cluster_configuration, {})
+
+    rm = RecoveryManager(tempfile.mktemp(), True)
+    alert = RecoveryAlert(definition_json, definition_json['source'], rm)
+    alert.set_helpers(collector, cluster_configuration)
+    alert.set_cluster("c1", "c6401.ambari.apache.org")
+    self.assertEquals(1, alert.interval())
+
+    #  OK - "count": 0
+    alert.collect()
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    self.assertEquals('OK', alerts[0]['state'])
+
+    #  WARN - "count": 1
+    rm_get_actions_mock.return_value = {
+      "METRICS_COLLECTOR": {
+        "count": 1,
+        "lastAttempt": 1447860184,
+        "warnedLastReset": False,
+        "lastReset": 1447860184,
+        "warnedThresholdReached": False,
+        "lifetimeCount": 1,
+        "warnedLastAttempt": False
+      }
+    }
+    alert.collect()
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    self.assertEquals('WARNING', alerts[0]['state'])
+
+    #  CRIT - "count": 5
+    rm_get_actions_mock.return_value = {
+      "METRICS_COLLECTOR": {
+        "count": 5,
+        "lastAttempt": 1447860184,
+        "warnedLastReset": False,
+        "lastReset": 1447860184,
+        "warnedThresholdReached": False,
+        "lifetimeCount": 1,
+        "warnedLastAttempt": False
+      }
+    }
+    alert.collect()
+    alerts = collector.alerts()
+    self.assertEquals(0, len(collector.alerts()))
+    self.assertEquals('CRITICAL', alerts[0]['state'])
 
   @patch.object(socket.socket,"connect")
   def test_port_alert_complex_uri(self, socket_connect_mock):
@@ -475,7 +542,7 @@ class TestAlerts(TestCase):
 
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -522,7 +589,7 @@ class TestAlerts(TestCase):
 
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -558,7 +625,7 @@ class TestAlerts(TestCase):
     cluster_configuration = self.__get_cluster_configuration()
     ash = AlertSchedulerHandler(test_file_path, test_stack_path,
       test_common_services_path, test_host_scripts_path, 5, cluster_configuration,
-      None)
+      None, None)
 
     ash.start()
 
@@ -633,6 +700,13 @@ class TestAlerts(TestCase):
     self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{0}')
     self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{0}')
 
+    rm = RecoveryManager(tempfile.mktemp())
+    definition_json['source']['type'] = 'RECOVERY'
+    alert = RecoveryAlert(definition_json, definition_json['source'], rm)
+    self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), 'No recovery operations executed for {2}{0}.')
+    self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{1} recovery operations executed for {2}{0}.')
+    self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{1} recovery operations executed for {2}{0}.')
+
 
   def test_configuration_updates(self):
     definition_json = self._get_script_alert_definition()
@@ -1212,6 +1286,33 @@ class TestAlerts(TestCase):
     }
 
 
+  def _get_recovery_alert_definition(self):
+    return {
+      "componentName": "METRICS_COLLECTOR",
+      "name": "ams_metrics_collector_autostart",
+      "label": "Metrics Collector Recovery",
+      "description": "This alert is triggered if the Metrics Collector has been auto-started for number of times equal to threshold.",
+      "interval": 1,
+      "scope": "HOST",
+      "enabled": True,
+      "source": {
+        "type": "RECOVERY",
+        "reporting": {
+          "ok": {
+            "text": "Metrics Collector hasn't been auto-started since {0}."
+          },
+          "warning": {
+            "text": "Metrics Collector has been auto-started {1} times since {0}.",
+            "count": 1
+          },
+          "critical": {
+            "text": "Metrics Collector has been auto-started {1} times since {0}.",
+            "count": 5
+          }
+        }
+      }
+    }
+
   def _get_metric_alert_definition(self):
     return {
       "name": "DataNode CPU Check",

+ 3 - 4
ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py

@@ -20,9 +20,7 @@ limitations under the License.
 
 from unittest import TestCase
 import unittest
-import socket
-import os
-import time
+import tempfile
 from mock.mock import patch, MagicMock, call
 import StringIO
 import sys
@@ -115,7 +113,7 @@ class TestHeartbeat(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
     actionQueue = ActionQueue(config, dummy_controller)
     result_mock.return_value = {
       'reports': [{'status': 'IN_PROGRESS',
@@ -209,6 +207,7 @@ class TestHeartbeat(TestCase):
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
+
     dummy_controller = MagicMock()
     actionQueue = ActionQueue(config, dummy_controller)
     statusCommand = {

+ 16 - 16
ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py

@@ -20,7 +20,7 @@ limitations under the License.
 
 from unittest import TestCase
 import copy
-
+import tempfile
 from ambari_agent.RecoveryManager import RecoveryManager
 from mock.mock import patch, MagicMock, call
 
@@ -124,7 +124,7 @@ class TestRecoveryManager(TestCase):
 
   @patch.object(RecoveryManager, "update_desired_status")
   def test_process_commands(self, mock_uds):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     rm.process_status_commands(None)
     self.assertFalse(mock_uds.called)
 
@@ -154,7 +154,7 @@ class TestRecoveryManager(TestCase):
     pass
 
   def test_defaults(self):
-    rm = RecoveryManager()
+    rm = RecoveryManager(tempfile.mktemp())
     self.assertFalse(rm.enabled())
     self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
     self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
@@ -170,7 +170,7 @@ class TestRecoveryManager(TestCase):
       [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
        1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
 
-    rm = RecoveryManager(True, False)
+    rm = RecoveryManager(tempfile.mktemp(), True, False)
     self.assertTrue(rm.enabled())
 
     rm.update_config(0, 60, 5, 12, True, False, "", "")
@@ -243,7 +243,7 @@ class TestRecoveryManager(TestCase):
     pass
 
   def test_recovery_required(self):
-    rm = RecoveryManager(True, False)
+    rm = RecoveryManager(tempfile.mktemp(), True, False)
 
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
@@ -273,7 +273,7 @@ class TestRecoveryManager(TestCase):
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
 
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
@@ -291,13 +291,13 @@ class TestRecoveryManager(TestCase):
 
   def test_recovery_required2(self):
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
     rm.update_config(15, 5, 1, 16, True, False, "", "")
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
     rm.update_config(15, 5, 1, 16, True, False, "NODEMANAGER", "")
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -307,7 +307,7 @@ class TestRecoveryManager(TestCase):
     rm.update_desired_status("DATANODE", "STARTED")
     self.assertFalse(rm.requires_recovery("DATANODE"))
 
-    rm = RecoveryManager(True, True)
+    rm = RecoveryManager(tempfile.mktemp(), True, True)
     rm.update_config(15, 5, 1, 16, True, False, "", "NODEMANAGER")
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -339,7 +339,7 @@ class TestRecoveryManager(TestCase):
 
   @patch('time.time', MagicMock(side_effects=[1]))
   def test_store_from_status_and_use(self):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
 
     command1 = copy.deepcopy(self.command)
 
@@ -391,7 +391,7 @@ class TestRecoveryManager(TestCase):
        4100, 4101, 4102, 4103,
        4200, 4201, 4202,
        4300, 4301, 4302]
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     rm.update_config(15, 5, 1, 16, True, False, "", "")
 
     command1 = copy.deepcopy(self.command)
@@ -469,7 +469,7 @@ class TestRecoveryManager(TestCase):
 
   @patch.object(RecoveryManager, "update_config")
   def test_update_rm_config(self, mock_uc):
-    rm = RecoveryManager()
+    rm = RecoveryManager(tempfile.mktemp())
     rm.update_configuration_from_registration(None)
     mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True, "", "")])
 
@@ -518,7 +518,7 @@ class TestRecoveryManager(TestCase):
     time_mock.side_effect = \
       [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1715]
 
-    rm = RecoveryManager()
+    rm = RecoveryManager(tempfile.mktemp())
     rec_st = rm.get_recovery_status()
     self.assertEquals(rec_st, {"summary": "DISABLED"})
 
@@ -565,7 +565,7 @@ class TestRecoveryManager(TestCase):
     time_mock.side_effect = \
       [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
 
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     rm.update_config(5, 5, 1, 11, True, False, "", "")
 
     command1 = copy.deepcopy(self.command)
@@ -594,7 +594,7 @@ class TestRecoveryManager(TestCase):
     pass
 
   def test_command_count(self):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     self.assertFalse(rm.has_active_command())
     rm.start_execution_command()
     self.assertTrue(rm.has_active_command())
@@ -606,7 +606,7 @@ class TestRecoveryManager(TestCase):
     self.assertFalse(rm.has_active_command())
 
   def test_configured_for_recovery(self):
-    rm = RecoveryManager(True)
+    rm = RecoveryManager(tempfile.mktemp(), True)
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertTrue(rm.configured_for_recovery("B"))
 

+ 4 - 0
ambari-server/conf/unix/ambari.properties

@@ -107,3 +107,7 @@ http.x-frame-options=DENY
 views.http.strict-transport-security=max-age=31536000
 views.http.x-xss-protection=1; mode=block
 views.http.x-frame-options=SAMEORIGIN
+
+# Enable Metrics Collector auto-restart
+recovery.type=AUTO_START
+recovery.enabled_components=METRICS_COLLECTOR

+ 4 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java

@@ -328,6 +328,10 @@ public class AlertDefinitionFactory {
           clazz = WebSource.class;
           break;
         }
+        case RECOVERY: {
+          clazz = RecoverySource.class;
+          break;
+        }
         case SERVER:{
           clazz = ServerSource.class;
           break;

+ 32 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/alert/RecoverySource.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.alert;
+
+/**
+ * Alert when the source type is defined as {@link org.apache.ambari.server.state.alert.SourceType#RECOVERY}
+ * <p/>
+ * Equality checking for instances of this class should be executed on every
+ * member to ensure that reconciling stack differences is correct.
+ */
+public class RecoverySource extends Source {
+
+  public RecoverySource() {
+
+  }
+
+}

+ 5 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java

@@ -51,6 +51,11 @@ public enum SourceType {
    */
   WEB,
 
+  /**
+   * Source is a component state recovery results
+   */
+  RECOVERY,
+
   /**
    * A server-side alert.
    */

+ 24 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json

@@ -28,6 +28,30 @@
       }
     ],
     "METRICS_COLLECTOR": [
+      {
+        "name": "ams_metrics_collector_autostart",
+        "label": "Metrics Collector Auto-Start",
+        "description": "This alert is triggered if the Metrics Collector has been auto-started for number of times equal to threshold.",
+        "interval": 1,
+        "scope": "ANY",
+        "enabled": true,
+        "source": {
+          "type": "RECOVERY",
+          "reporting": {
+            "ok": {
+              "text": "Metrics Collector hasn't been auto-started{0}."
+            },
+            "warning": {
+              "text": "Metrics Collector has been auto-started {1} times{0}.",
+              "count": 1
+            },
+            "critical": {
+              "text": "Metrics Collector has been auto-started {1} times{0}.",
+              "count": 5
+            }
+          }
+        }
+      },
       {
         "name": "ams_metrics_collector_process",
         "label": "Metrics Collector Process",