瀏覽代碼

AMBARI-10029. Node auto-recovery (phase-I)

Sumit Mohanty 10 年之前
父節點
當前提交
dcbf12ef92
共有 32 個文件被更改,包括 2166 次插入64 次删除
  1. 28 10
      ambari-agent/src/main/python/ambari_agent/ActionQueue.py
  2. 20 3
      ambari-agent/src/main/python/ambari_agent/Controller.py
  3. 6 2
      ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
  4. 4 1
      ambari-agent/src/main/python/ambari_agent/DataCleaner.py
  5. 3 0
      ambari-agent/src/main/python/ambari_agent/Heartbeat.py
  6. 563 0
      ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
  7. 3 0
      ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
  8. 106 0
      ambari-agent/src/test/python/ambari_agent/TestController.py
  9. 430 0
      ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
  10. 3 3
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
  11. 85 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java
  12. 67 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java
  13. 9 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
  14. 10 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
  15. 22 2
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
  16. 27 6
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
  17. 113 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
  18. 67 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java
  19. 12 1
      ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
  20. 45 2
      ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
  21. 63 5
      ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
  22. 10 0
      ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
  23. 83 23
      ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
  24. 46 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/HostResponse.java
  25. 9 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
  26. 2 2
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/URLStreamProvider.java
  27. 12 0
      ambari-server/src/main/java/org/apache/ambari/server/state/Host.java
  28. 24 0
      ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java
  29. 2 0
      ambari-server/src/main/resources/properties.json
  30. 136 1
      ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
  31. 45 0
      ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
  32. 111 1
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostResourceProviderTest.java

+ 28 - 10
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -53,6 +53,7 @@ class ActionQueue(threading.Thread):
 
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
+  AUTO_EXECUTION_COMMAND = 'AUTO_EXECUTION_COMMAND'
   BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
   ROLE_COMMAND_INSTALL = 'INSTALL'
   ROLE_COMMAND_START = 'START'
@@ -91,8 +92,8 @@ class ActionQueue(threading.Thread):
 
     for command in commands:
       logger.info("Adding " + command['commandType'] + " for service " + \
-                    command['serviceName'] + " of cluster " + \
-                    command['clusterName'] + " to the queue.")
+                  command['serviceName'] + " of cluster " + \
+                  command['clusterName'] + " to the queue.")
       self.statusCommandQueue.put(command)
 
   def put(self, commands):
@@ -102,7 +103,8 @@ class ActionQueue(threading.Thread):
       if not command.has_key('clusterName'):
         command['clusterName'] = 'null'
 
-      logger.info("Adding " + command['commandType'] + " for service " + \
+      logger.info("Adding " + command['commandType'] + " for role " + \
+                  command['role'] + " for service " + \
                   command['serviceName'] + " of cluster " + \
                   command['clusterName'] + " to the queue.")
       if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
@@ -174,7 +176,7 @@ class ActionQueue(threading.Thread):
     commandType = command['commandType']
     logger.debug("Took an element of Queue (command type = %s)." % commandType)
     try:
-      if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND]:
+      if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
         self.execute_command(command)
       elif commandType == self.STATUS_COMMAND:
         self.execute_status_command(command)
@@ -192,6 +194,7 @@ class ActionQueue(threading.Thread):
     clusterName = command['clusterName']
     commandId = command['commandId']
     isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
+    isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND
     message = "Executing command with id = {commandId} for role = {role} of " \
               "cluster {cluster}.".format(
               commandId = str(commandId), role=command['role'],
@@ -203,12 +206,21 @@ class ActionQueue(threading.Thread):
     in_progress_status = self.commandStatuses.generate_report_template(command)
     # The path of the files that contain the output log and error log use a prefix that the agent advertises to the
     # server. The prefix is defined in agent-config.ini
-    in_progress_status.update({
-      'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
-      'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
-      'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
-      'status': self.IN_PROGRESS_STATUS
-    })
+    if not isAutoExecuteCommand:
+      in_progress_status.update({
+        'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
+        'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
+        'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
+        'status': self.IN_PROGRESS_STATUS
+      })
+    else:
+      in_progress_status.update({
+        'tmpout': self.tmpdir + os.sep + 'auto_output-' + str(taskId) + '.txt',
+        'tmperr': self.tmpdir + os.sep + 'auto_errors-' + str(taskId) + '.txt',
+        'structuredOut' : self.tmpdir + os.sep + 'auto_structured-out-' + str(taskId) + '.json',
+        'status': self.IN_PROGRESS_STATUS
+      })
+
     self.commandStatuses.put_command_status(command, in_progress_status)
 
     # running command
@@ -322,6 +334,7 @@ class ActionQueue(threading.Thread):
                               globalConfig, self.config, self.configTags)
 
       component_extra = None
+      request_execution_cmd = False
 
       # For custom services, responsibility to determine service status is
       # delegated to python scripts
@@ -330,13 +343,18 @@ class ActionQueue(threading.Thread):
 
       if component_status_result['exitcode'] == 0:
         component_status = LiveStatus.LIVE_STATUS
+        self.controller.recovery_manager.update_current_status(component, component_status)
       else:
         component_status = LiveStatus.DEAD_STATUS
+        self.controller.recovery_manager.update_current_status(component, component_status)
+        request_execution_cmd = self.controller.recovery_manager.requires_recovery(component)
 
       if component_status_result.has_key('structuredOut'):
         component_extra = component_status_result['structuredOut']
 
       result = livestatus.build(forsed_component_status= component_status)
+      if self.controller.recovery_manager.enabled():
+        result['sendExecCmdDet'] = str(request_execution_cmd)
 
       # Add security state to the result
       result['securityState'] = component_security_status_result

+ 20 - 3
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -44,6 +44,7 @@ from ambari_agent.NetUtil import NetUtil
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.ClusterConfiguration import  ClusterConfiguration
+from ambari_agent.RecoveryManager import  RecoveryManager
 from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
 
 logger = logging.getLogger()
@@ -81,6 +82,7 @@ class Controller(threading.Thread):
     self.heartbeat_stop_callback = heartbeat_stop_callback
     # List of callbacks that are called at agent registration
     self.registration_listeners = []
+    self.recovery_manager = RecoveryManager()
 
     # pull config directory out of config
     cache_dir = config.get('agent', 'cache_dir')
@@ -128,8 +130,10 @@ class Controller(threading.Thread):
                       self.hostname, prettyData)
 
         ret = self.sendRequest(self.registerUrl, data)
+        prettyData = pprint.pformat(ret)
+        logger.debug("Registration response is %s", prettyData)
 
-        # exitstatus is a code of error which was rised on server side.
+        # exitstatus is a code of error which was raised on server side.
         # exitstatus = 0 (OK - Default)
         # exitstatus = 1 (Registration failed because different version of agent and server)
         exitstatus = 0
@@ -158,17 +162,20 @@ class Controller(threading.Thread):
         # always update cached cluster configurations on registration
         self.cluster_configuration.update_configurations_from_heartbeat(ret)
 
+        self.recovery_manager.update_configuration_from_registration(ret)
+
         # always update alert definitions on registration
         self.alert_scheduler_handler.update_definitions(ret)
       except ssl.SSLError:
         self.repeatRegistration = False
         self.isRegistered = False
         return
-      except Exception:
+      except Exception, ex:
         # try a reconnect only after a certain amount of random time
         delay = randint(0, self.range)
         logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
-        """ Sleeping for {0} seconds and then retrying again """.format(delay)
+        logger.error("Error:" + str(ex))
+        logger.warn(""" Sleeping for {0} seconds and then trying again """.format(delay,))
         time.sleep(delay)
 
     return ret
@@ -265,11 +272,21 @@ class Controller(threading.Thread):
 
         if 'executionCommands' in response_keys:
           execution_commands = response['executionCommands']
+          self.recovery_manager.process_execution_commands(execution_commands)
           self.addToQueue(execution_commands)
 
         if 'statusCommands' in response_keys:
+          # try storing execution command details and desired state
+          self.recovery_manager.process_status_commands(response['statusCommands'])
           self.addToStatusQueue(response['statusCommands'])
 
+        if self.actionQueue.commandQueue.empty():
+          recovery_commands = self.recovery_manager.get_recovery_commands()
+          for recovery_command in recovery_commands:
+            logger.info("Adding recovery command %s for component %s",
+                        recovery_command['roleCommand'], recovery_command['role'])
+            self.addToQueue([recovery_command])
+
         if 'alertDefinitionCommands' in response_keys:
           self.alert_scheduler_handler.update_definitions(response)
 

+ 6 - 2
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -230,6 +230,7 @@ class CustomServiceOrchestrator():
     override_output_files=True # by default, we override status command output
     if logger.level == logging.DEBUG:
       override_output_files = False
+
     res = self.runCommand(command, self.status_commands_stdout,
                           self.status_commands_stderr, self.COMMAND_NAME_STATUS,
                           override_output_files=override_output_files)
@@ -267,7 +268,7 @@ class CustomServiceOrchestrator():
 
   def resolve_script_path(self, base_dir, script):
     """
-    Incapsulates logic of script location determination.
+    Encapsulates logic of script location determination.
     """
     path = os.path.join(base_dir, script)
     if not os.path.exists(path):
@@ -305,7 +306,7 @@ class CustomServiceOrchestrator():
     command_type = command['commandType']
     from ActionQueue import ActionQueue  # To avoid cyclic dependency
     if command_type == ActionQueue.STATUS_COMMAND:
-      # These files are frequently created, thats why we don't
+      # These files are frequently created, that's why we don't
       # store them all, but only the latest one
       file_path = os.path.join(self.tmp_dir, "status_command.json")
     else:
@@ -313,6 +314,9 @@ class CustomServiceOrchestrator():
       if 'clusterHostInfo' in command and command['clusterHostInfo']:
         command['clusterHostInfo'] = self.decompressClusterHostInfo(command['clusterHostInfo'])
       file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))
+      if command_type == ActionQueue.AUTO_EXECUTION_COMMAND:
+        file_path = os.path.join(self.tmp_dir, "auto_command-{0}.json".format(task_id))
+
     # Json may contain passwords, that's why we need proper permissions
     if os.path.isfile(file_path):
       os.unlink(file_path)

+ 4 - 1
ambari-agent/src/main/python/ambari_agent/DataCleaner.py

@@ -28,7 +28,10 @@ import logging
 logger = logging.getLogger()
 
 class DataCleaner(threading.Thread):
-  FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
+  COMMAND_FILE_NAMES_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
+  AUTO_COMMAND_FILE_NAMES_PATTERN = \
+    'auto_command-\d+.json|auto_errors-\d+.txt|auto_output-\d+.txt|auto_structured-out-\d+.json'
+  FILE_NAME_PATTERN = AUTO_COMMAND_FILE_NAMES_PATTERN + "|" + COMMAND_FILE_NAMES_PATTERN
 
   def __init__(self, config):
     threading.Thread.__init__(self)

+ 3 - 0
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -55,6 +55,9 @@ class Heartbeat:
                   'nodeStatus'        : nodeStatus
                 }
 
+    rec_status = self.actionQueue.controller.recovery_manager.get_recovery_status()
+    heartbeat['recoveryReport'] = rec_status
+
     commandsInProgress = False
     if not self.actionQueue.commandQueue.empty():
       commandsInProgress = True

+ 563 - 0
ambari-agent/src/main/python/ambari_agent/RecoveryManager.py

@@ -0,0 +1,563 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import copy
+import time
+import threading
+import pprint
+
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.LiveStatus import LiveStatus
+
+
+logger = logging.getLogger()
+
+"""
+RecoveryManager has the following capabilities:
+* Store data needed for execution commands extracted from STATUS command
+* Generate INSTALL command
+* Generate START command
+"""
+
+
+class RecoveryManager:
+  COMMAND_TYPE = "commandType"
+  PAYLOAD_LEVEL = "payloadLevel"
+  COMPONENT_NAME = "componentName"
+  ROLE = "role"
+  TASK_ID = "taskId"
+  DESIRED_STATE = "desiredState"
+  EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
+  ROLE_COMMAND = "roleCommand"
+  PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
+  PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
+  PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND"
+  STARTED = "STARTED"
+  INSTALLED = "INSTALLED"
+  INIT = "INIT"  # TODO: What is the state when machine is reset
+
+  default_action_counter = {
+    "lastAttempt": 0,
+    "count": 0,
+    "lastReset": 0,
+    "lifetimeCount" : 0,
+    "warnedLastAttempt": False,
+    "warnedLastReset": False,
+    "warnedThresholdReached": False
+  }
+
+
+  def __init__(self, recovery_enabled=False, auto_start_only=False):
+    self.recovery_enabled = recovery_enabled
+    self.auto_start_only = auto_start_only
+    self.max_count = 6
+    self.window_in_min = 60
+    self.retry_gap = 5
+    self.max_lifetime_count = 12
+
+    self.stored_exec_commands = {}
+    self.id = int(time.time())
+    self.allowed_desired_states = [self.STARTED, self.INSTALLED]
+    self.allowed_current_states = [self.INIT, self.INSTALLED]
+    self.actions = {}
+    self.statuses = {}
+    self.__status_lock = threading.RLock()
+
+    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
+
+    pass
+
+
+  def enabled(self):
+    return self.recovery_enabled
+
+
+  def update_current_status(self, component, state):
+    """
+    Updates the current status of a host component managed by the agent
+    """
+    if component not in self.statuses:
+      self.__status_lock.acquire()
+      try:
+        if component not in self.statuses:
+          self.statuses[component] = {
+            "current": "",
+            "desired": ""
+          }
+      finally:
+        self.__status_lock.release()
+      pass
+
+    self.statuses[component]["current"] = state
+    pass
+
+
+  def update_desired_status(self, component, state):
+    """
+    Updates the desired status of a host component managed by the agent
+    """
+    if component not in self.statuses:
+      self.__status_lock.acquire()
+      try:
+        if component not in self.statuses:
+          self.statuses[component] = {
+            "current": "",
+            "desired": ""
+          }
+      finally:
+        self.__status_lock.release()
+      pass
+
+    self.statuses[component]["desired"] = state
+    pass
+
+
+  def requires_recovery(self, component):
+    """
+    Recovery is allowed for:
+    INISTALLED --> STARTED
+    INIT --> INSTALLED --> STARTED
+    CLIENTs may be RE-INSTALLED (TODO)
+    """
+    if not self.enabled():
+      return False
+
+    if component not in self.statuses:
+      return False
+
+    status = self.statuses[component]
+    if status["current"] == status["desired"]:
+      return False
+
+    if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
+      return False
+
+    ### No recovery to INSTALLED or INIT states
+    if status["current"] == self.STARTED:
+      return False
+
+    logger.info("%s needs recovery.", component)
+    return True
+    pass
+
+
+
+  def get_recovery_status(self):
+    """
+    Creates a status in the form
+    {
+      "summary" : "RECOVERABLE|DISABLED|PARTIALLY_RECOVERABLE|UNRECOVERABLE",
+      "component_reports" : [
+        {
+          "name": "component_name",
+          "numAttempts" : "x",
+          "limitReached" : "true|false"
+          "status" : "REQUIRES_RECOVERY|RECOVERY_COMMAND_REQUESTED|RECOVERY_COMMAND_ISSUED|NO_RECOVERY_NEEDED"
+        }
+      ]
+    }
+    """
+    report = {}
+    report["summary"] = "DISABLED"
+    if self.enabled():
+      report["summary"] = "RECOVERABLE"
+      num_limits_reached = 0
+      recovery_states = []
+      report["componentReports"] = recovery_states
+      self.__status_lock.acquire()
+      try:
+        for component in self.actions.keys():
+          action = self.actions[component]
+          recovery_state = {}
+          recovery_state["name"] = component
+          recovery_state["numAttempts"] = action["lifetimeCount"]
+          recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
+          recovery_states.append(recovery_state)
+          if recovery_state["limitReached"] == True:
+            num_limits_reached += 1
+          pass
+      finally:
+        self.__status_lock.release()
+
+      if num_limits_reached > 0:
+        report["summary"] = "PARTIALLY_RECOVERABLE"
+        if num_limits_reached == len(recovery_states):
+          report["summary"] = "UNRECOVERABLE"
+
+    return report
+    pass
+
+  def get_recovery_commands(self):
+    """
+    This method computes the recovery commands for the following transitions
+    INSTALLED --> STARTED
+    INIT --> INSTALLED
+    """
+    commands = []
+    for component in self.statuses.keys():
+      if self.requires_recovery(component) and self.may_execute(component):
+        status = copy.deepcopy(self.statuses[component])
+        if status["desired"] == self.STARTED:
+          if status["current"] == self.INSTALLED:
+            command = self.get_start_command(component)
+          elif status["current"] == self.INIT:
+            command = self.get_install_command(component)
+        elif status["desired"] == self.INSTALLED:
+          if status["current"] == self.INIT:
+            command = self.get_install_command(component)
+        if command:
+          self.execute(component)
+          commands.append(command)
+    return commands
+    pass
+
+
+  def may_execute(self, action):
+    """
+    Check if an action can be executed
+    """
+    if not action or action.strip() == "":
+      return False
+
+    if action not in self.actions:
+      self.__status_lock.acquire()
+      try:
+        self.actions[action] = copy.deepcopy(self.default_action_counter)
+      finally:
+        self.__status_lock.release()
+    return self._execute_action_chk_only(action)
+    pass
+
+
+  def execute(self, action):
+    """
+    Executed an action
+    """
+    if not action or action.strip() == "":
+      return False
+
+    if action not in self.actions:
+      self.__status_lock.acquire()
+      try:
+        self.actions[action] = copy.deepcopy(self.default_action_counter)
+      finally:
+        self.__status_lock.release()
+    return self._execute_action_(action)
+    pass
+
+
+  def _execute_action_(self, action_name):
+    """
+    _private_ implementation of [may] execute
+    """
+    action_counter = self.actions[action_name]
+    now = self._now_()
+    seconds_since_last_attempt = now - action_counter["lastAttempt"]
+    if action_counter["lifetimeCount"] < self.max_lifetime_count:
+      if action_counter["count"] < self.max_count:
+        if seconds_since_last_attempt > self.retry_gap_in_sec:
+          action_counter["count"] += 1
+          action_counter["lifetimeCount"] +=1
+          if self.retry_gap > 0:
+            action_counter["lastAttempt"] = now
+          action_counter["warnedLastAttempt"] = False
+          if action_counter["count"] == 1:
+            action_counter["lastReset"] = now
+          return True
+        else:
+          if action_counter["warnedLastAttempt"] == False:
+            action_counter["warnedLastAttempt"] = True
+            logger.warn(
+              "%s seconds has not passed since last occurrence %s seconds back for %s. " +
+              "Will silently skip execution without warning till retry gap is passed",
+              self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+          else:
+            logger.debug(
+              "%s seconds has not passed since last occurrence %s seconds back for %s",
+              self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+      else:
+        sec_since_last_reset = now - action_counter["lastReset"]
+        if sec_since_last_reset > self.window_in_sec:
+          action_counter["count"] = 1
+          action_counter["lifetimeCount"] +=1
+          if self.retry_gap > 0:
+            action_counter["lastAttempt"] = now
+          action_counter["lastReset"] = now
+          action_counter["warnedLastReset"] = False
+          return True
+        else:
+          if action_counter["warnedLastReset"] == False:
+            action_counter["warnedLastReset"] = True
+            logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
+                        "Will silently skip execution without warning till window is reset",
+                        action_counter["count"], self.window_in_min, action_name)
+          else:
+            logger.debug("%s occurrences in %s minutes reached the limit for %s",
+                         action_counter["count"], self.window_in_min, action_name)
+    else:
+      if action_counter["warnedThresholdReached"] == False:
+        action_counter["warnedThresholdReached"] = True
+        logger.warn("%s occurrences in agent life time reached the limit for %s. " +
+                    "Will silently skip execution without warning till window is reset",
+                    action_counter["lifetimeCount"], action_name)
+      else:
+        logger.debug("%s occurrences in agent life time reached the limit for %s",
+                     action_counter["lifetimeCount"], action_name)
+    return False
+    pass
+
+
+  def _execute_action_chk_only(self, action_name):
+    """
+    _private_ implementation of [may] execute check only
+    """
+    action_counter = self.actions[action_name]
+    now = self._now_()
+    seconds_since_last_attempt = now - action_counter["lastAttempt"]
+
+    if action_counter["lifetimeCount"] < self.max_lifetime_count:
+      if action_counter["count"] < self.max_count:
+        if seconds_since_last_attempt > self.retry_gap_in_sec:
+          return True
+      else:
+        sec_since_last_reset = now - action_counter["lastReset"]
+        if sec_since_last_reset > self.window_in_sec:
+          return True
+
+    return False
+    pass
+
+  def _now_(self):
+    return int(time.time())
+    pass
+
+
+  def update_configuration_from_registration(self, reg_resp):
+    """
+    TODO: Server sends the recovery configuration - call update_config after parsing
+    "recovery_config": {
+      "type" : "DEFAULT|AUTO_START|FULL",
+      "maxCount" : 10,
+      "windowInMinutes" : 60,
+      "retryGap" : 0 }
+    """
+
+    recovery_enabled = False
+    auto_start_only = True
+    max_count = 6
+    window_in_min = 60
+    retry_gap = 5
+    max_lifetime_count = 12
+
+    if reg_resp and "recoveryConfig" in reg_resp:
+      config = reg_resp["recoveryConfig"]
+      if "type" in config:
+        if config["type"] in ["AUTO_START", "FULL"]:
+          recovery_enabled = True
+          if config["type"] == "FULL":
+            auto_start_only = False
+      if "maxCount" in config:
+        max_count = self._read_int_(config["maxCount"], max_count)
+      if "windowInMinutes" in config:
+        window_in_min = self._read_int_(config["windowInMinutes"], window_in_min)
+      if "retryGap" in config:
+        retry_gap = self._read_int_(config["retryGap"], retry_gap)
+      if 'maxLifetimeCount' in config:
+        max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count)
+    self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only)
+    pass
+
+
+  def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only):
+    """
+    Update recovery configuration, recovery is disabled if configuration values
+    are not correct
+    """
+    self.recovery_enabled = False;
+    if max_count <= 0:
+      logger.warn("Recovery disabled: max_count must be a non-negative number")
+      return
+
+    if window_in_min <= 0:
+      logger.warn("Recovery disabled: window_in_min must be a non-negative number")
+      return
+
+    if retry_gap < 1:
+      logger.warn("Recovery disabled: retry_gap must be a positive number and at least 1")
+      return
+    if retry_gap >= window_in_min:
+      logger.warn("Recovery disabled: retry_gap must be smaller than window_in_min")
+      return
+    if max_lifetime_count < 0 or max_lifetime_count < max_count:
+      logger.warn("Recovery disabled: max_lifetime_count must more than 0 and >= max_count")
+      return
+
+    self.max_count = max_count
+    self.window_in_min = window_in_min
+    self.retry_gap = retry_gap
+    self.window_in_sec = window_in_min * 60
+    self.retry_gap_in_sec = retry_gap * 60
+    self.auto_start_only = auto_start_only
+    self.max_lifetime_count = max_lifetime_count
+
+    self.allowed_desired_states = [self.STARTED, self.INSTALLED]
+    self.allowed_current_states = [self.INIT, self.INSTALLED]
+
+    if self.auto_start_only:
+      self.allowed_desired_states = [self.STARTED]
+      self.allowed_current_states = [self.INSTALLED]
+
+    self.recovery_enabled = recovery_enabled
+    if self.recovery_enabled:
+      logger.info(
+        "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and lifetime max being %s.",
+        self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count)
+    pass
+
+
+  def get_unique_task_id(self):
+    self.id += 1
+    return self.id
+    pass
+
+
+  def process_status_commands(self, commands):
+    if not self.enabled():
+      return
+
+    if commands and len(commands) > 0:
+      for command in commands:
+        self.store_or_update_command(command)
+        if self.EXECUTION_COMMAND_DETAILS in command:
+          logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS]))
+
+    pass
+
+
+  def process_execution_commands(self, commands):
+    if not self.enabled():
+      return
+
+    if commands and len(commands) > 0:
+      for command in commands:
+        if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+          if self.ROLE in command:
+            if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+              self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
+            if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
+              self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+    pass
+
+
+  def store_or_update_command(self, command):
+    """
+    Stores command details by reading them from the STATUS_COMMAND
+    Update desired state as well
+    """
+    if not self.enabled():
+      return
+
+    logger.debug("Inspecting command to store/update details")
+    if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND:
+      payloadLevel = self.PAYLOAD_LEVEL_DEFAULT
+      if self.PAYLOAD_LEVEL in command:
+        payloadLevel = command[self.PAYLOAD_LEVEL]
+
+      component = command[self.COMPONENT_NAME]
+      self.update_desired_status(component, command[self.DESIRED_STATE])
+
+      if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
+        if self.EXECUTION_COMMAND_DETAILS in command:
+          # Store the execution command details
+          self.remove_command(component)
+          self.stored_exec_commands[component] = command[self.EXECUTION_COMMAND_DETAILS]
+          logger.debug("Stored command details for " + component)
+        else:
+          logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
+        pass
+    pass
+
+
+  def get_install_command(self, component):
+    if self.enabled():
+      logger.debug("Using stored INSTALL command for %s", component)
+      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+        command = copy.deepcopy(self.stored_exec_commands[component])
+        command[self.ROLE_COMMAND] = "INSTALL"
+        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+        command[self.TASK_ID] = self.get_unique_task_id()
+        return command
+      else:
+        logger.info("INSTALL command cannot be computed.")
+    else:
+      logger.info("Recovery is not enabled. INSTALL command will not be computed.")
+    return None
+    pass
+
+
+  def get_start_command(self, component):
+    if self.enabled():
+      logger.debug("Using stored START command for %s", component)
+      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+        command = copy.deepcopy(self.stored_exec_commands[component])
+        command[self.ROLE_COMMAND] = "START"
+        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+        command[self.TASK_ID] = self.get_unique_task_id()
+        return command
+      else:
+        logger.info("START command cannot be computed.")
+    else:
+      logger.info("Recovery is not enabled. START command will not be computed.")
+
+    return None
+    pass
+
+
+  def command_exists(self, component, command_type):
+    if command_type == ActionQueue.EXECUTION_COMMAND:
+      if component in self.stored_exec_commands:
+        return True
+
+    return False
+    pass
+
+
+  def remove_command(self, component):
+    if component in self.stored_exec_commands:
+      del self.stored_exec_commands[component]
+      return True
+    return False
+
+
+  def _read_int_(self, value, default_value=0):
+    int_value = default_value
+    try:
+      int_value = int(value)
+    except ValueError:
+      pass
+    return int_value
+
+
+def main(argv=None):
+  cmd_mgr = RecoveryManager()
+  pass
+
+
+if __name__ == '__main__':
+  main()

+ 3 - 0
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -35,6 +35,7 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.PythonExecutor import PythonExecutor
 from ambari_agent.CommandStatusDict import CommandStatusDict
 from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from ambari_agent.RecoveryManager import RecoveryManager
 from FileCache import FileCache
 from ambari_commons import OSCheck
 from only_for_platform import only_for_platform, get_platform, not_for_platform, PLATFORM_LINUX, PLATFORM_WINDOWS
@@ -544,6 +545,8 @@ class TestActionQueue(TestCase):
 
     build_mock.return_value = {'dummy report': '' }
 
+    dummy_controller.recovery_manager = RecoveryManager()
+
     requestComponentStatus_mock.reset_mock()
     requestComponentStatus_mock.return_value = {'exitcode': 0 }
 

+ 106 - 0
ambari-agent/src/test/python/ambari_agent/TestController.py

@@ -598,6 +598,112 @@ class TestController(unittest.TestCase):
     self.assertEquals(LiveStatus_mock.CLIENT_COMPONENTS, client_components_expected)
     self.assertEquals(LiveStatus_mock.COMPONENTS, components_expected)
 
+  @patch("socket.gethostbyname")
+  @patch("json.dumps")
+  @patch("time.sleep")
+  @patch("pprint.pformat")
+  @patch.object(Controller, "randint")
+  @patch.object(Controller, "LiveStatus")
+  def test_recoveryRegConfig(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
+                    dumpsMock, socketGhbnMock):
+    self.assertEquals(self.controller.recovery_manager.recovery_enabled, False)
+    self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
+    self.assertEquals(self.controller.recovery_manager.max_count, 6)
+    self.assertEquals(self.controller.recovery_manager.window_in_min, 60)
+    self.assertEquals(self.controller.recovery_manager.retry_gap, 5)
+
+    out = StringIO.StringIO()
+    sys.stdout = out
+
+
+    dumpsMock.return_value = '{"valid_object": true}'
+    socketGhbnMock.return_value = "host1"
+
+    sendRequest = MagicMock(name="sendRequest")
+    self.controller.sendRequest = sendRequest
+
+    register = MagicMock(name="register")
+    self.controller.register = register
+
+    sendRequest.return_value = {
+      "responseId": 1,
+      "recoveryConfig": {
+        "type": "FULL",
+        "maxCount": 5,
+        "windowInMinutes": 50,
+        "retryGap": 3,
+        "maxLifetimeCount": 7},
+      "log": "", "exitstatus": "0"}
+
+    self.controller.isRegistered = False
+    self.controller.registerWithServer()
+
+    self.assertEquals(self.controller.recovery_manager.recovery_enabled, True)
+    self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
+    self.assertEquals(self.controller.recovery_manager.max_count, 5)
+    self.assertEquals(self.controller.recovery_manager.window_in_min, 50)
+    self.assertEquals(self.controller.recovery_manager.retry_gap, 3)
+    self.assertEquals(self.controller.recovery_manager.max_lifetime_count, 7)
+
+    sys.stdout = sys.__stdout__
+
+    self.controller.sendRequest = Controller.Controller.sendRequest
+    self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
+    pass
+
+  @patch.object(threading._Event, "wait")
+  @patch("time.sleep")
+  @patch("json.dumps")
+  def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock):
+
+    out = StringIO.StringIO()
+    sys.stdout = out
+
+    hearbeat = MagicMock()
+    self.controller.heartbeat = hearbeat
+    event_mock.return_value = False
+    dumpsMock.return_value = "data"
+
+    sendRequest = MagicMock(name="sendRequest")
+    self.controller.sendRequest = sendRequest
+    addToQueue = MagicMock(name="addToQueue")
+    addToStatusQueue = MagicMock(name="addToStatusQueue")
+    self.addToQueue = addToQueue
+    self.addToStatusQueue = addToStatusQueue
+
+    process_execution_commands = MagicMock(name="process_execution_commands")
+    self.controller.recovery_manager.process_execution_commands = process_execution_commands
+    process_status_commands = MagicMock(name="process_status_commands")
+    self.controller.recovery_manager.process_status_commands = process_status_commands
+
+    self.controller.responseId = 0
+    response = {"responseId":1, "statusCommands": "commands2", "executionCommands" : "commands1", "log":"", "exitstatus":"0"}
+    sendRequest.return_value = response
+
+    def one_heartbeat(*args, **kwargs):
+      self.controller.DEBUG_STOP_HEARTBEATING = True
+      return response
+
+    sendRequest.side_effect = one_heartbeat
+
+    actionQueue = MagicMock()
+    actionQueue.isIdle.return_value = True
+
+    # one successful request, after stop
+    self.controller.actionQueue = actionQueue
+    self.controller.heartbeatWithServer()
+    self.assertTrue(sendRequest.called)
+    self.assertTrue(process_execution_commands.called)
+    self.assertTrue(process_status_commands.called)
+    process_execution_commands.assert_called_with("commands1")
+    process_status_commands.assert_called_with("commands2")
+
+    self.controller.heartbeatWithServer()
+    sys.stdout = sys.__stdout__
+    self.controller.sendRequest = Controller.Controller.sendRequest
+    self.controller.sendRequest = Controller.Controller.addToQueue
+    self.controller.sendRequest = Controller.Controller.addToStatusQueue
+    pass
 
 if __name__ == "__main__":
   unittest.main(verbosity=2)

+ 430 - 0
ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py

@@ -0,0 +1,430 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import copy
+
+from ambari_agent.RecoveryManager import RecoveryManager
+from mock.mock import patch, MagicMock, call
+
+
+class TestRecoveryManager(TestCase):
+  command = {
+    "commandType": "STATUS_COMMAND",
+    "payloadLevel": "EXECUTION_COMMAND",
+    "componentName": "NODEMANAGER",
+    "desiredState": "STARTED",
+    "executionCommandDetails": {
+      "commandType": "EXECUTION_COMMAND",
+      "roleCommand": "INSTALL",
+      "role": "NODEMANAGER",
+      "configurations": {
+        "capacity-scheduler": {
+          "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+        "capacity-calculator": {
+          "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+        "commandParams": {
+          "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+        }
+      }
+    }
+  }
+
+  exec_command1 = {
+    "commandType": "EXECUTION_COMMAND",
+    "roleCommand": "INSTALL",
+    "role": "NODEMANAGER",
+    "configurations": {
+      "capacity-scheduler": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "capacity-calculator": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "commandParams": {
+        "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+      }
+    }
+  }
+
+  exec_command2 = {
+    "commandType": "EXECUTION_COMMAND",
+    "roleCommand": "START",
+    "role": "NODEMANAGER",
+    "configurations": {
+      "capacity-scheduler": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "capacity-calculator": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "commandParams": {
+        "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+      }
+    }
+  }
+
+  exec_command3 = {
+    "commandType": "EXECUTION_COMMAND",
+    "roleCommand": "SERVICE_CHECK",
+    "role": "NODEMANAGER",
+    "configurations": {
+      "capacity-scheduler": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "capacity-calculator": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "commandParams": {
+        "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+      }
+    }
+  }
+
+  def setUp(self):
+    pass
+
+  def tearDown(self):
+    pass
+
+  @patch.object(RecoveryManager, "update_desired_status")
+  def test_process_commands(self, mock_uds):
+    rm = RecoveryManager(True)
+    rm.process_status_commands(None)
+    self.assertFalse(mock_uds.called)
+
+    rm.process_status_commands([])
+    self.assertFalse(mock_uds.called)
+
+    rm.process_status_commands([self.command])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")])
+
+    mock_uds.reset_mock()
+
+    rm.process_status_commands([self.command, self.exec_command1, self.command])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")], [call("NODEMANAGER", "STARTED")])
+
+    mock_uds.reset_mock()
+
+    rm.process_execution_commands([self.exec_command1, self.exec_command2, self.exec_command3])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")], [call("NODEMANAGER", "STARTED")])
+
+    mock_uds.reset_mock()
+
+    rm.process_execution_commands([self.exec_command1, self.command])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")])
+    pass
+
+  def test_defaults(self):
+    rm = RecoveryManager()
+    self.assertFalse(rm.enabled())
+    self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
+    self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+    pass
+
+  @patch.object(RecoveryManager, "_now_")
+  def test_sliding_window(self, time_mock):
+    time_mock.side_effect = \
+      [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
+       1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
+
+    rm = RecoveryManager(True, False)
+    self.assertTrue(rm.enabled())
+
+    rm.update_config(0, 60, 5, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 5, 12, True, False)
+    self.assertTrue(rm.enabled())
+
+    rm.update_config(6, 0, 5, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 0, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 1, 12, True, False)
+    self.assertTrue(rm.enabled())
+
+    rm.update_config(6, 60, 61, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 5, 0, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 5, 4, True, False)
+    self.assertFalse(rm.enabled())
+
+    # maximum 2 in 2 minutes and at least 1 minute wait
+    rm.update_config(2, 5, 1, 4, True, False)
+    self.assertTrue(rm.enabled())
+
+    # T = 1000-2
+    self.assertTrue(rm.may_execute("NODEMANAGER"))
+    self.assertTrue(rm.may_execute("NODEMANAGER"))
+    self.assertTrue(rm.may_execute("NODEMANAGER"))
+
+    # T = 1003-4
+    self.assertTrue(rm.execute("NODEMANAGER"))
+    self.assertFalse(rm.execute("NODEMANAGER"))  # too soon
+
+    # T = 1071
+    self.assertTrue(rm.execute("NODEMANAGER"))  # 60+ seconds passed
+
+    # T = 1150-3
+    self.assertFalse(rm.execute("NODEMANAGER"))  # limit 2 exceeded
+    self.assertFalse(rm.may_execute("NODEMANAGER"))
+    self.assertTrue(rm.execute("DATANODE"))
+    self.assertTrue(rm.may_execute("NAMENODE"))
+
+    # T = 1400-1
+    self.assertTrue(rm.execute("NODEMANAGER"))  # windows reset
+    self.assertFalse(rm.may_execute("NODEMANAGER"))  # too soon
+
+    # maximum 2 in 2 minutes and no min wait
+    rm.update_config(2, 5, 1, 5, True, True)
+
+    # T = 1500-3
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertTrue(rm.may_execute("NODEMANAGER2"))
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertFalse(rm.execute("NODEMANAGER2"))  # max limit
+
+    # T = 1900-2
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+
+    # T = 2300-2
+    # lifetime max reached
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertFalse(rm.execute("NODEMANAGER2"))
+    pass
+
+  def test_recovery_required(self):
+    rm = RecoveryManager(True, False)
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "STARTED")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "XYS")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+    rm = RecoveryManager(True, True)
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "START")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "START")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+    pass
+
+  @patch('time.time', MagicMock(side_effects=[1]))
+  def test_store_from_status_and_use(self):
+    rm = RecoveryManager(True)
+
+    command1 = copy.deepcopy(self.command)
+
+    rm.store_or_update_command(command1)
+    self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
+
+    install_command = rm.get_install_command("NODEMANAGER")
+    start_command = rm.get_start_command("NODEMANAGER")
+
+    self.assertEqual("INSTALL", install_command["roleCommand"])
+    self.assertEqual("START", start_command["roleCommand"])
+    self.assertEqual("AUTO_EXECUTION_COMMAND", install_command["commandType"])
+    self.assertEqual("AUTO_EXECUTION_COMMAND", start_command["commandType"])
+    self.assertEqual("NODEMANAGER", install_command["role"])
+    self.assertEqual("NODEMANAGER", start_command["role"])
+    self.assertEquals(install_command["configurations"], start_command["configurations"])
+
+    self.assertEqual(2, install_command["taskId"])
+    self.assertEqual(3, start_command["taskId"])
+
+    self.assertEqual(None, rm.get_install_command("component2"))
+    self.assertEqual(None, rm.get_start_command("component2"))
+
+    self.assertTrue(rm.remove_command("NODEMANAGER"))
+    self.assertFalse(rm.remove_command("NODEMANAGER"))
+
+    self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
+    self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
+
+    self.assertEqual(None, rm.get_install_command("component2"))
+    self.assertEqual(None, rm.get_start_command("component2"))
+
+    pass
+
+  @patch.object(RecoveryManager, "_now_")
+  def test_get_recovery_commands(self, time_mock):
+    time_mock.side_effect = \
+      [1000, 2000, 3000, 4000, 5000, 6000]
+    rm = RecoveryManager(True)
+    rm.update_config(10, 5, 1, 11, True, False)
+
+    command1 = copy.deepcopy(self.command)
+
+    rm.store_or_update_command(command1)
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("START", commands[0]["roleCommand"])
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+    rm.update_config(2, 5, 1, 5, True, True)
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(0, len(commands))
+    pass
+
+  @patch.object(RecoveryManager, "update_config")
+  def test_update_rm_config(self, mock_uc):
+    rm = RecoveryManager()
+    rm.update_configuration_from_registration(None)
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration({})
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+      "type" : "DEFAULT"}}
+    )
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+        "type" : "FULL"}}
+    )
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+        "type" : "AUTO_START",
+        "max_count" : "med"}}
+    )
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+        "type" : "AUTO_START",
+        "maxCount" : "5",
+        "windowInMinutes" : 20,
+        "retryGap" : 2,
+        "maxLifetimeCount" : 5}}
+    )
+    mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True)])
+  pass
+
+  @patch.object(RecoveryManager, "_now_")
+  def test_recovery_report(self, time_mock):
+    time_mock.side_effect = \
+      [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1715]
+
+    rm = RecoveryManager()
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "DISABLED"})
+
+    rm.update_config(2, 5, 1, 4, True, True)
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
+
+    rm.execute("PUMA")
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "RECOVERABLE",
+                               "componentReports": [{"name": "PUMA", "numAttempts": 1, "limitReached": False}]})
+    rm.execute("PUMA")
+    rm.execute("LION")
+
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "RECOVERABLE",
+                               "componentReports": [
+                                 {"name": "LION", "numAttempts": 1, "limitReached": False},
+                                 {"name": "PUMA", "numAttempts": 2, "limitReached": False}
+                               ]})
+    rm.execute("PUMA")
+    rm.execute("LION")
+    rm.execute("PUMA")
+    rm.execute("PUMA")
+    rm.execute("LION")
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "PARTIALLY_RECOVERABLE",
+                               "componentReports": [
+                                 {"name": "LION", "numAttempts": 3, "limitReached": False},
+                                 {"name": "PUMA", "numAttempts": 4, "limitReached": True}
+                               ]})
+
+    rm.execute("LION")
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "UNRECOVERABLE",
+                               "componentReports": [
+                                 {"name": "LION", "numAttempts": 4, "limitReached": True},
+                                 {"name": "PUMA", "numAttempts": 4, "limitReached": True}
+                               ]})
+    pass

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java

@@ -100,9 +100,9 @@ public class ExecutionCommandWrapper {
           Map<String, Map<String, Map<String, String>>> configAttributes = configHelper.getEffectiveConfigAttributes(cluster,
               executionCommand.getConfigurationTags());
 
-          for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurance : configAttributes.entrySet()) {
-            String type = attributesOccurance.getKey();
-            Map<String, Map<String, String>> attributes = attributesOccurance.getValue();
+          for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurrence : configAttributes.entrySet()) {
+            String type = attributesOccurrence.getKey();
+            Map<String, Map<String, String>> attributes = attributesOccurrence.getValue();
 
             if (executionCommand.getConfigurationAttributes() != null) {
               if (!executionCommand.getConfigurationAttributes().containsKey(type)) {

+ 85 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java

@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import com.google.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Captures various agent requests that it sends as part of requests
+ */
+
+@Singleton
+public class AgentRequests {
+  private static Log LOG = LogFactory.getLog(HeartbeatMonitor.class);
+  private final Map<String, Map<String, Boolean>> requiresExecCmdDetails = new HashMap<String, Map<String, Boolean>>();
+  private final Object _lock = new Object();
+
+  /**
+   * Creates a holder for agent requests
+   */
+  public AgentRequests() {
+  }
+
+  public void setExecutionDetailsRequest(String host, String component, String requestExecutionCmd) {
+    if (StringUtils.isNotBlank(requestExecutionCmd)) {
+      LOG.debug("Setting need for exec command to " + requestExecutionCmd + " for " + component);
+      Map<String, Boolean> perHostRequiresExecCmdDetails = getPerHostRequiresExecCmdDetails(host);
+      if (Boolean.TRUE.toString().toUpperCase().equals(requestExecutionCmd.toUpperCase())) {
+        perHostRequiresExecCmdDetails.put(component, Boolean.TRUE);
+      } else {
+        perHostRequiresExecCmdDetails.put(component, Boolean.FALSE);
+      }
+    }
+  }
+
+  public Boolean shouldSendExecutionDetails(String host, String component) {
+
+    Map<String, Boolean> perHostRequiresExecCmdDetails = getPerHostRequiresExecCmdDetails(host);
+    if (perHostRequiresExecCmdDetails != null && perHostRequiresExecCmdDetails.containsKey(component)) {
+      LOG.debug("Sending exec command details for " + component);
+      return perHostRequiresExecCmdDetails.get(component);
+    }
+
+    return Boolean.FALSE;
+  }
+
+  private Map<String, Boolean> getPerHostRequiresExecCmdDetails(String host) {
+    if (!requiresExecCmdDetails.containsKey(host)) {
+      synchronized (_lock) {
+        if (!requiresExecCmdDetails.containsKey(host)) {
+          requiresExecCmdDetails.put(host, new HashMap<String, Boolean>());
+        }
+      }
+    }
+
+    return requiresExecCmdDetails.get(host);
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("requiresExecCmdDetails: ").append(requiresExecCmdDetails.toString()).toString();
+  }
+}

+ 67 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+public class ComponentRecoveryReport {
+  private String name;
+  private int numAttempts;
+  private boolean limitReached;
+
+
+  @JsonProperty("name")
+  public String getName() {
+    return name;
+  }
+
+  @JsonProperty("name")
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @JsonProperty("num_attempts")
+  public int getNumAttempts() {
+    return numAttempts;
+  }
+
+  @JsonProperty("num_attempts")
+  public void setNumAttempts(int numAttempts) {
+    this.numAttempts = numAttempts;
+  }
+
+  @JsonProperty("limit_reached")
+  public boolean getLimitReached() {
+    return limitReached;
+  }
+
+  @JsonProperty("limit_reached")
+  public void setLimitReached(boolean limitReached) {
+    this.limitReached = limitReached;
+  }
+
+  @Override
+  public String toString() {
+    return "ComponentRecoveryReport{" +
+           "name='" + name + '\'' +
+           ", numFailures='" + numAttempts + '\'' +
+           ", limitReached='" + limitReached + '\'' +
+           '}';
+  }
+}

+ 9 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java

@@ -34,6 +34,7 @@ public class ComponentStatus {
    * @see org.apache.ambari.server.state.SecurityState
    */
   private String securityState;
+  private String sendExecCmdDet = "False";
 
   private String serviceName;
   private String clusterName;
@@ -61,6 +62,14 @@ public class ComponentStatus {
     return status;
   }
 
+  public void setSendExecCmdDet(String sendExecCmdDet) {
+    this.sendExecCmdDet = sendExecCmdDet;
+  }
+
+  public String getSendExecCmdDet() {
+    return this.sendExecCmdDet;
+  }
+
   public void setStatus(String status) {
     this.status = status;
   }

+ 10 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java

@@ -41,6 +41,7 @@ public class HeartBeat {
   HostStatus nodeStatus;
   private AgentEnv agentEnv = null;
   private List<Alert> alerts = null;
+  private RecoveryReport recoveryReport;
 
   public long getResponseId() {
     return responseId;
@@ -84,6 +85,14 @@ public class HeartBeat {
     this.nodeStatus = nodeStatus;
   }
 
+  public RecoveryReport getRecoveryReport() {
+    return recoveryReport;
+  }
+
+  public void setRecoveryReport(RecoveryReport recoveryReport) {
+    this.recoveryReport = recoveryReport;
+  }
+
   public AgentEnv getAgentEnv() {
     return agentEnv;
   }
@@ -129,6 +138,7 @@ public class HeartBeat {
             ", reports=" + reports +
             ", componentStatus=" + componentStatus +
             ", nodeStatus=" + nodeStatus +
+            ", recoveryReport=" + recoveryReport +
             '}';
   }
 }

+ 22 - 2
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -156,6 +156,7 @@ public class HeartBeatHandler {
   @Inject
   private VersionEventPublisher versionEventPublisher;
 
+
   /**
    * KerberosPrincipalHostDAO used to set and get Kerberos principal details
    */
@@ -245,6 +246,11 @@ public class HeartBeatHandler {
       }
     }
 
+    if (heartbeat.getRecoveryReport() != null) {
+      RecoveryReport rr = heartbeat.getRecoveryReport();
+      processRecoveryReport(rr, hostname);
+    }
+
     try {
       if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
         hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
@@ -254,7 +260,7 @@ public class HeartBeatHandler {
             null));
       }
     } catch (InvalidStateTransitionException ex) {
-      LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
+      LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
       hostObject.setState(HostState.INIT);
       return createRegisterCommand();
     }
@@ -262,7 +268,7 @@ public class HeartBeatHandler {
     // Examine heartbeat for command reports
     processCommandReports(heartbeat, hostname, clusterFsm, now);
 
-    // Examine heartbeart for component live status reports
+    // Examine heartbeat for component live status reports
     processStatusReports(heartbeat, hostname, clusterFsm);
 
     // Calculate host status
@@ -317,6 +323,12 @@ public class HeartBeatHandler {
     }
   }
 
+  protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
+    LOG.debug("Received recovery report: " + recoveryReport.toString());
+    Host host = clusterFsm.getHost(hostname);
+    host.setRecoveryReport(recoveryReport);
+  }
+
   protected void processHostStatus(HeartBeat heartbeat, String hostname) throws AmbariException {
 
     Host host = clusterFsm.getHost(hostname);
@@ -696,6 +708,9 @@ public class HeartBeatHandler {
                       " (" + e.getMessage() + ")");
                 }
               }
+
+              this.heartbeatMonitor.getAgentRequests()
+                  .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
             } else {
               // TODO: What should be done otherwise?
             }
@@ -941,6 +956,11 @@ public class HeartBeatHandler {
     List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname);
     response.setAlertDefinitionCommands(alertDefinitionCommands);
 
+    response.setRecoveryConfig(RecoveryConfig.getRecoveryConfig(config));
+    if(response.getRecoveryConfig() != null) {
+      LOG.debug("Recovery configuration set to " + response.getRecoveryConfig().toString());
+    }
+
     Long requestId = 0L;
     hostResponseIds.put(hostname, requestId);
     response.setResponseId(requestId);

+ 27 - 6
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java

@@ -33,7 +33,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import com.google.inject.Inject;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
@@ -75,6 +77,7 @@ public class HeartbeatMonitor implements Runnable {
   private final AmbariMetaInfo ambariMetaInfo;
   private final AmbariManagementController ambariManagementController;
   private final Configuration configuration;
+  private final AgentRequests agentRequests;
 
   public HeartbeatMonitor(Clusters clusters, ActionQueue aq, ActionManager am,
                           int threadWakeupInterval, Injector injector) {
@@ -87,6 +90,7 @@ public class HeartbeatMonitor implements Runnable {
     ambariManagementController = injector.getInstance(
             AmbariManagementController.class);
     configuration = injector.getInstance(Configuration.class);
+    agentRequests = new AgentRequests();
   }
 
   public void shutdown() {
@@ -106,6 +110,10 @@ public class HeartbeatMonitor implements Runnable {
     return monitorThread.isAlive();
   }
 
+  public AgentRequests getAgentRequests() {
+    return this.agentRequests;
+  }
+
   @Override
   public void run() {
     while (shouldRun) {
@@ -217,7 +225,7 @@ public class HeartbeatMonitor implements Runnable {
   }
 
   /**
-   * Generates status command and fills all apropriate fields.
+   * Generates status command and fills all appropriate fields.
    * @throws AmbariException
    */
   private StatusCommand createStatusCommand(String hostname, Cluster cluster,
@@ -241,6 +249,10 @@ public class HeartbeatMonitor implements Runnable {
     //Config clusterConfig = cluster.getDesiredConfigByType(GLOBAL);
     Collection<Config> clusterConfigs = cluster.getAllConfigs();
 
+    // Apply global properties for this host from all config groups
+    Map<String, Map<String, String>> allConfigTags = configHelper
+        .getEffectiveDesiredTags(cluster, hostname);
+
     for(Config clusterConfig: clusterConfigs) {
       if(!clusterConfig.getType().endsWith("-env")) {
         continue;
@@ -250,10 +262,6 @@ public class HeartbeatMonitor implements Runnable {
         // cluster config for 'global'
         Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
 
-        // Apply global properties for this host from all config groups
-        Map<String, Map<String, String>> allConfigTags = configHelper
-                .getEffectiveDesiredTags(cluster, hostname);
-
         Map<String, Map<String, String>> configTags = new HashMap<String,
                 Map<String, String>>();
 
@@ -294,6 +302,13 @@ public class HeartbeatMonitor implements Runnable {
     statusCmd.setConfigurationAttributes(configurationAttributes);
     statusCmd.setHostname(hostname);
 
+    // If Agent wants the command and the States differ
+    statusCmd.setDesiredState(sch.getDesiredState());
+    if (getAgentRequests().shouldSendExecutionDetails(hostname, componentName)) {
+      LOG.info(componentName + " is at " + sch.getState() + " adding more payload per agent ask");
+      statusCmd.setPayloadLevel(StatusCommand.StatusCommandPayload.EXECUTION_COMMAND);
+    }
+
     // Fill command params
     Map<String, String> commandParams = statusCmd.getCommandParams();
 
@@ -322,7 +337,13 @@ public class HeartbeatMonitor implements Runnable {
     hostLevelParams.put(STACK_NAME, stackId.getStackName());
     hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
 
+
+    if (statusCmd.getPayloadLevel() == StatusCommand.StatusCommandPayload.EXECUTION_COMMAND) {
+      ExecutionCommand ec = ambariManagementController.getExecutionCommand(cluster, sch, RoleCommand.START);
+      statusCmd.setExecutionCommand(ec);
+      LOG.debug(componentName + " has more payload for execution command");
+    }
+
     return statusCmd;
   }
-
 }

+ 113 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.server.configuration.Configuration;
+
+
+/**
+ * Recovery config to be sent to the agent
+ */
+public class RecoveryConfig {
+
+  /**
+   * Creates a holder for agent requests
+   */
+  public RecoveryConfig() {
+  }
+
+  @SerializedName("type")
+  private String type;
+
+  @SerializedName("maxCount")
+  private String maxCount;
+
+  @SerializedName("windowInMinutes")
+  private String windowInMinutes;
+
+  @SerializedName("retryGap")
+  private String retryGap;
+
+  @SerializedName("maxLifetimeCount")
+  private String maxLifetimeCount;
+
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public String getMaxCount() {
+    return maxCount;
+  }
+
+  public void setMaxCount(String maxCount) {
+    this.maxCount = maxCount;
+  }
+
+  public String getWindowInMinutes() {
+    return windowInMinutes;
+  }
+
+  public void setWindowInMinutes(String windowInMinutes) {
+    this.windowInMinutes = windowInMinutes;
+  }
+
+  public String getRetryGap() {
+    return retryGap;
+  }
+
+  public void setRetryGap(String retryGap) {
+    this.retryGap = retryGap;
+  }
+
+  public String getMaxLifetimeCount() {
+    return maxLifetimeCount;
+  }
+
+  public void setMaxLifetimeCount(String maxLifetimeCount) {
+    this.maxLifetimeCount = maxLifetimeCount;
+  }
+
+  public static RecoveryConfig getRecoveryConfig(Configuration conf) {
+    RecoveryConfig rc = new RecoveryConfig();
+    rc.setMaxCount(conf.getNodeRecoveryMaxCount());
+    rc.setMaxLifetimeCount(conf.getNodeRecoveryLifetimeMaxCount());
+    rc.setRetryGap(conf.getNodeRecoveryRetryGap());
+    rc.setType(conf.getNodeRecoveryType());
+    rc.setWindowInMinutes(conf.getNodeRecoveryWindowInMin());
+    return rc;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("RecoveryConfig{");
+    buffer.append(", type=").append(type);
+    buffer.append(", maxCount=").append(maxCount);
+    buffer.append(", windowInMinutes=").append(windowInMinutes);
+    buffer.append(", retryGap=").append(retryGap);
+    buffer.append(", maxLifetimeCount=").append(maxLifetimeCount);
+    buffer.append('}');
+    return buffer.toString();
+  }
+}

+ 67 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+public class RecoveryReport {
+
+  /**
+   * One of DISABLED, RECOVERABLE, UNRECOVERABLE, PARTIALLY_RECOVERABLE
+   */
+  private String summary = "DISABLED";
+  private List<ComponentRecoveryReport> componentReports = new ArrayList<ComponentRecoveryReport>();
+
+
+  @JsonProperty("summary")
+  public String getSummary() {
+    return summary;
+  }
+
+  @JsonProperty("summary")
+  public void setSummary(String summary) {
+    this.summary = summary;
+  }
+
+  @JsonProperty("component_reports")
+  public List<ComponentRecoveryReport> getComponentReports() {
+    return componentReports;
+  }
+
+  @JsonProperty("component_reports")
+  public void setComponentReports(List<ComponentRecoveryReport> componentReports) {
+    this.componentReports = componentReports;
+  }
+
+  @Override
+  public String toString() {
+    String componentReportsStr = "[]";
+    if (componentReports != null) {
+      componentReportsStr = Arrays.toString(componentReports.toArray());
+    }
+    return "RecoveryReport{" +
+           "summary='" + summary + '\'' +
+           ", component_reports='" + componentReportsStr +
+           "'}";
+  }
+}

+ 12 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java

@@ -58,6 +58,9 @@ public class RegistrationResponse {
   @JsonProperty("responseId")
   private long responseId;
 
+  @JsonProperty("recoveryConfig")
+  private RecoveryConfig recoveryConfig;
+
   @JsonProperty("statusCommands")
   private List<StatusCommand> statusCommands = null;
 
@@ -81,7 +84,6 @@ public class RegistrationResponse {
    * Gets the alert definition commands that contain the alert definitions for
    * each cluster that the host is a member of.
    *
-   * @param commands
    *          the commands, or {@code null} for none.
    */
   public List<AlertDefinitionCommand> getAlertDefinitionCommands() {
@@ -115,6 +117,14 @@ public class RegistrationResponse {
     this.log = log;
   }
 
+  public RecoveryConfig getRecoveryConfig() {
+    return recoveryConfig;
+  }
+
+  public void setRecoveryConfig(RecoveryConfig recoveryConfig) {
+    this.recoveryConfig = recoveryConfig;
+  }
+
   @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder("RegistrationResponse{");
@@ -122,6 +132,7 @@ public class RegistrationResponse {
     buffer.append(", responseId=").append(responseId);
     buffer.append(", statusCommands=").append(statusCommands);
     buffer.append(", alertDefinitionCommands=").append(alertDefinitionCommands);
+    buffer.append(", recoveryConfig=").append(recoveryConfig);
     buffer.append('}');
     return buffer.toString();
   }

+ 45 - 2
ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java

@@ -17,11 +17,12 @@
  */
 package org.apache.ambari.server.agent;
 
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.server.state.State;
+
 import java.util.HashMap;
 import java.util.Map;
 
-import com.google.gson.annotations.SerializedName;
-
 /**
  * Command to report the status of a list of services in roles.
  */
@@ -55,6 +56,39 @@ public class StatusCommand extends AgentCommand {
   @SerializedName("hostname")
   private String hostname = null;
 
+  @SerializedName("payloadLevel")
+  private StatusCommandPayload payloadLevel = StatusCommandPayload.DEFAULT;
+
+  @SerializedName("desiredState")
+  private State desiredState;
+
+  @SerializedName("executionCommandDetails")
+  private ExecutionCommand executionCommand;
+
+  public ExecutionCommand getExecutionCommand() {
+    return executionCommand;
+  }
+
+  public void setExecutionCommand(ExecutionCommand executionCommand) {
+    this.executionCommand = executionCommand;
+  }
+
+  public State getDesiredState() {
+    return desiredState;
+  }
+
+  public void setDesiredState(State desiredState) {
+    this.desiredState = desiredState;
+  }
+
+  public StatusCommandPayload getPayloadLevel() {
+    return payloadLevel;
+  }
+
+  public void setPayloadLevel(StatusCommandPayload payloadLevel) {
+    this.payloadLevel = payloadLevel;
+  }
+
   public String getClusterName() {
     return clusterName;
   }
@@ -118,4 +152,13 @@ public class StatusCommand extends AgentCommand {
   public String getHostname() {
     return hostname;
   }
+
+  public enum StatusCommandPayload {
+    // The minimal payload for status, agent adds necessary details
+    MINIMAL,
+    // default payload - backward compatible
+    DEFAULT,
+    // has enough details to construct START or INSTALL commands
+    EXECUTION_COMMAND
+  }
 }

+ 63 - 5
ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java

@@ -241,6 +241,21 @@ public class Configuration {
   public static final String KDC_CONNECTION_CHECK_TIMEOUT_DEFAULT = "10000";
   public static final String KERBEROS_KEYTAB_CACHE_DIR_KEY = "kerberos.keytab.cache.dir";
   public static final String KERBEROS_KEYTAB_CACHE_DIR_DEFAULT = "/var/lib/ambari-server/data/cache";
+
+  /**
+   * Recovery related configuration
+   */
+  public static final String RECOVERY_TYPE_KEY = "recovery.type";
+  public static final String RECOVERY_TYPE_DEFAULT = "DEFAULT";
+  public static final String RECOVERY_LIFETIME_MAX_COUNT_KEY = "recovery.lifetime_max_count";
+  public static final String RECOVERY_LIFETIME_MAX_COUNT_DEFAULT = "12";
+  public static final String RECOVERY_MAX_COUNT_KEY = "recovery.max_count";
+  public static final String RECOVERY_MAX_COUNT_DEFAULT = "6";
+  public static final String RECOVERY_WINDOW_IN_MIN_KEY = "recovery.window_in_minutes";
+  public static final String RECOVERY_WINDOW_IN_MIN_DEFAULT = "60";
+  public static final String RECOVERY_RETRY_GAP_KEY = "recovery.retry_interval";
+  public static final String RECOVERY_RETRY_GAP_DEFAULT = "5";
+
   /**
    * This key defines whether stages of parallel requests are executed in
    * parallel or sequentally. Only stages from different requests
@@ -1069,7 +1084,7 @@ public class Configuration {
 
   public int getConnectionMaxIdleTime() {
     return Integer.parseInt(properties.getProperty
-            (SERVER_CONNECTION_MAX_IDLE_TIME, String.valueOf("900000")));
+        (SERVER_CONNECTION_MAX_IDLE_TIME, String.valueOf("900000")));
   }
 
   /**
@@ -1105,7 +1120,8 @@ public class Configuration {
   }
 
   public int getOneWayAuthPort() {
-    return Integer.parseInt(properties.getProperty(SRVR_ONE_WAY_SSL_PORT_KEY, String.valueOf(SRVR_ONE_WAY_SSL_PORT_DEFAULT)));
+    return Integer.parseInt(properties.getProperty(SRVR_ONE_WAY_SSL_PORT_KEY,
+                                                   String.valueOf(SRVR_ONE_WAY_SSL_PORT_DEFAULT)));
   }
 
   public int getTwoWayAuthPort() {
@@ -1196,7 +1212,7 @@ public class Configuration {
 
   public Integer getRequestReadTimeout() {
     return Integer.parseInt(properties.getProperty(REQUEST_READ_TIMEOUT,
-        REQUEST_READ_TIMEOUT_DEFAULT));
+                                                   REQUEST_READ_TIMEOUT_DEFAULT));
   }
 
   public Integer getRequestConnectTimeout() {
@@ -1206,7 +1222,7 @@ public class Configuration {
 
   public String getExecutionSchedulerConnections() {
     return properties.getProperty(EXECUTION_SCHEDULER_CONNECTIONS,
-        DEFAULT_SCHEDULER_MAX_CONNECTIONS);
+                                  DEFAULT_SCHEDULER_MAX_CONNECTIONS);
   }
 
   public Long getExecutionSchedulerMisfireToleration() {
@@ -1232,7 +1248,7 @@ public class Configuration {
 
   public String getCustomActionDefinitionPath() {
     return properties.getProperty(CUSTOM_ACTION_DEFINITION_KEY,
-        CUSTOM_ACTION_DEFINITION_DEF_VALUE);
+                                  CUSTOM_ACTION_DEFINITION_DEF_VALUE);
   }
 
   /**
@@ -1336,6 +1352,48 @@ public class Configuration {
     return properties.getProperty(ALERT_TEMPLATE_FILE);
   }
 
+  /**
+   * Get the node recovery type DEFAULT|AUTO_START|FULL
+   * @return
+   */
+  public String getNodeRecoveryType() {
+    return properties.getProperty(RECOVERY_TYPE_KEY, RECOVERY_TYPE_DEFAULT);
+  }
+
+  /**
+   * Get configured max count of recovery attempt allowed per host component in a window
+   * This is reset when agent is restarted.
+   * @return
+   */
+  public String getNodeRecoveryMaxCount() {
+    return properties.getProperty(RECOVERY_MAX_COUNT_KEY, RECOVERY_MAX_COUNT_DEFAULT);
+  }
+
+  /**
+   * Get configured max lifetime count of recovery attempt allowed per host component.
+   * This is reset when agent is restarted.
+   * @return
+   */
+  public String getNodeRecoveryLifetimeMaxCount() {
+    return properties.getProperty(RECOVERY_LIFETIME_MAX_COUNT_KEY, RECOVERY_LIFETIME_MAX_COUNT_DEFAULT);
+  }
+
+  /**
+   * Get configured window size in minutes
+   * @return
+   */
+  public String getNodeRecoveryWindowInMin() {
+    return properties.getProperty(RECOVERY_WINDOW_IN_MIN_KEY, RECOVERY_WINDOW_IN_MIN_DEFAULT);
+  }
+
+  /**
+   * Get the configured retry gap between tries per host component
+   * @return
+   */
+  public String getNodeRecoveryRetryGap() {
+    return properties.getProperty(RECOVERY_RETRY_GAP_KEY, RECOVERY_RETRY_GAP_DEFAULT);
+  }
+
   /**
    * Gets the default KDC port to use when no port is specified in KDC hostname
    *

+ 10 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java

@@ -19,7 +19,9 @@
 package org.apache.ambari.server.controller;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.internal.RequestStageContainer;
 import org.apache.ambari.server.metadata.RoleCommandOrder;
@@ -757,5 +759,13 @@ public interface AmbariManagementController {
    * @param service @Service object
    */
   public void initializeWidgetsAndLayouts(Cluster cluster, Service service) throws AmbariException;
+
+  /**
+   * Gets an execution command for host component life cycle command
+   * @return
+   */
+  public ExecutionCommand getExecutionCommand(Cluster cluster,
+                                              ServiceComponentHost scHost,
+                                              RoleCommand roleCommand) throws AmbariException;
 }
 

+ 83 - 23
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java

@@ -1695,7 +1695,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
    * Creates and populates an EXECUTION_COMMAND for host
    */
   private void createHostAction(Cluster cluster,
-                                Stage stage, ServiceComponentHost scHost,
+                                Stage stage,
+                                ServiceComponentHost scHost,
                                 Map<String, Map<String, String>> configurations,
                                 Map<String, Map<String, Map<String, String>>> configurationAttributes,
                                 Map<String, Map<String, String>> configTags,
@@ -1710,8 +1711,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     stage.addHostRoleExecutionCommand(scHost.getHostName(), Role.valueOf(scHost
       .getServiceComponentName()), roleCommand,
       event, scHost.getClusterName(),
-        serviceName, false);
-    String componentName = event.getServiceComponentName();
+      serviceName, false);
+    String componentName = scHost.getServiceComponentName();
     String hostname = scHost.getHostName();
     String osFamily = clusters.getHost(hostname).getOsFamily();
     StackId stackId = cluster.getDesiredStackVersion();
@@ -1975,12 +1976,12 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
           clusters.getHostsForCluster(cluster.getClusterName()), cluster);
 
       String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
-      String HostParamsJson = StageUtils.getGson().toJson(
+      String hostParamsJson = StageUtils.getGson().toJson(
           customCommandExecutionHelper.createDefaultHostParams(cluster));
 
       Stage stage = createNewStage(requestStages.getLastStageId(), cluster,
           requestStages.getId(), requestProperties.get(REQUEST_CONTEXT_PROPERTY),
-          clusterHostInfoJson, "{}", HostParamsJson);
+          clusterHostInfoJson, "{}", hostParamsJson);
 
       Collection<ServiceComponentHost> componentsToEnableKerberos = new ArrayList<ServiceComponentHost>();
       Set<String> hostsToForceKerberosOperations = new HashSet<String>();
@@ -2148,23 +2149,6 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
                   + ", roleCommand=" + roleCommand.name());
             }
 
-            // [ type -> [ key, value ] ]
-            Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
-            Map<String, Map<String, Map<String, String>>> configurationAttributes = new TreeMap<String, Map<String, Map<String, String>>>();
-            Host host = clusters.getHost(scHost.getHostName());
-
-            Map<String, Map<String, String>> configTags =
-              findConfigurationTagsWithOverrides(cluster, host.getHostName());
-
-            // HACK - Set configs on the ExecCmd
-            if (!scHost.getHostName().equals(jobtrackerHost)) {
-              if (configTags.get(Configuration.GLOBAL_CONFIG_TAG) != null) {
-                configHelper.applyCustomConfig(
-                    configurations, Configuration.GLOBAL_CONFIG_TAG,
-                    Configuration.RCA_ENABLED_PROPERTY, "false", false);
-              }
-            }
-
             // any targeted information
             String keyName = scHost.getServiceComponentName().toLowerCase();
             if (requestProperties.containsKey(keyName)) {
@@ -2196,8 +2180,27 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
               requestParameters.put(keyName, requestProperties.get(keyName));
             }
 
+            Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
+            Map<String, Map<String, Map<String, String>>>
+                configurationAttributes =
+                new TreeMap<String, Map<String, Map<String, String>>>();
+            Host host = clusters.getHost(scHost.getHostName());
+
+            Map<String, Map<String, String>> configTags =
+                findConfigurationTagsWithOverrides(cluster, host.getHostName());
+
+            // HACK - Set configs on the ExecCmd
+            if (!scHost.getHostName().equals(jobtrackerHost)) {
+              if (configTags.get(Configuration.GLOBAL_CONFIG_TAG) != null) {
+                configHelper.applyCustomConfig(
+                    configurations, Configuration.GLOBAL_CONFIG_TAG,
+                    Configuration.RCA_ENABLED_PROPERTY, "false", false);
+              }
+            }
+
+
             createHostAction(cluster, stage, scHost, configurations, configurationAttributes, configTags,
-              roleCommand, requestParameters, event);
+                             roleCommand, requestParameters, event);
           }
         }
       }
@@ -2264,6 +2267,63 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     return requestStages;
   }
 
+  public ExecutionCommand getExecutionCommand(Cluster cluster,
+                                              ServiceComponentHost scHost,
+                                              RoleCommand roleCommand) throws AmbariException {
+    Map<String, Set<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
+        clusters.getHostsForCluster(cluster.getClusterName()), cluster);
+    String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
+    Map<String, String> hostParamsCmd = customCommandExecutionHelper.createDefaultHostParams(cluster);
+    Stage stage = createNewStage(0, cluster,
+                                 1, "",
+                                 clusterHostInfoJson, "{}", "");
+
+
+    Map<String, Map<String, String>> configTags = configHelper.getEffectiveDesiredTags(cluster, scHost.getHostName());
+    Map<String, Map<String, String>> configurations = configHelper.getEffectiveConfigProperties(cluster, configTags);
+
+    Map<String, Map<String, Map<String, String>>>
+        configurationAttributes =
+        new TreeMap<String, Map<String, Map<String, String>>>();
+
+    createHostAction(cluster, stage, scHost, configurations, configurationAttributes, configTags,
+                     roleCommand, null, null);
+    ExecutionCommand ec = stage.getExecutionCommands().get(scHost.getHostName()).get(0).getExecutionCommand();
+
+    // createHostAction does not take a hostLevelParams but creates one
+    hostParamsCmd.putAll(ec.getHostLevelParams());
+    ec.getHostLevelParams().putAll(hostParamsCmd);
+
+    ec.setClusterHostInfo(
+        StageUtils.getClusterHostInfo(clusters.getHostsForCluster(cluster.getClusterName()), cluster));
+
+    // Hack - Remove passwords from configs
+    if (ec.getRole().equals(Role.HIVE_CLIENT.toString()) &&
+        ec.getConfigurations().containsKey(Configuration.HIVE_CONFIG_TAG)) {
+      ec.getConfigurations().get(Configuration.HIVE_CONFIG_TAG).remove(Configuration.HIVE_METASTORE_PASSWORD_PROPERTY);
+    }
+
+    // Add attributes
+    Map<String, Map<String, Map<String, String>>> configAttributes =
+        configHelper.getEffectiveConfigAttributes(cluster,
+                                                  ec.getConfigurationTags());
+
+    for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurrence : configAttributes.entrySet()) {
+      String type = attributesOccurrence.getKey();
+      Map<String, Map<String, String>> attributes = attributesOccurrence.getValue();
+
+      if (ec.getConfigurationAttributes() != null) {
+        if (!ec.getConfigurationAttributes().containsKey(type)) {
+          ec.getConfigurationAttributes().put(type, new TreeMap<String, Map<String, String>>());
+        }
+        configHelper.cloneAttributesMap(attributes, ec.getConfigurationAttributes().get(type));
+      }
+    }
+
+    return ec;
+  }
+
+
   @Transactional
   void updateServiceStates(
       Map<State, List<Service>> changedServices,

+ 46 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/HostResponse.java

@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.ambari.server.agent.AgentEnv;
 import org.apache.ambari.server.agent.DiskInfo;
+import org.apache.ambari.server.agent.RecoveryReport;
 import org.apache.ambari.server.state.AgentVersion;
 import org.apache.ambari.server.state.DesiredConfig;
 import org.apache.ambari.server.state.HostConfig;
@@ -122,6 +123,16 @@ public class HostResponse {
    * Host Health Status
    */
   private HostHealthStatus healthStatus;
+
+  /**
+   * Recovery status
+   */
+  private RecoveryReport recoveryReport;
+
+  /**
+   * Summary of node recovery
+   */
+  private String recoverySummary = "DISABLED";
   
   /**
    * Public name.
@@ -440,6 +451,8 @@ public class HostResponse {
     this.healthStatus = healthStatus;
   }
 
+
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -489,7 +502,7 @@ public class HostResponse {
   }
   
   /**
-   * @param lastAgentEnv
+   * @param agentEnv
    */
   public void setLastAgentEnv(AgentEnv agentEnv) {
     lastAgentEnv = agentEnv;
@@ -535,4 +548,36 @@ public class HostResponse {
   public MaintenanceState getMaintenanceState() {
     return maintenanceState;
   }
+
+  /**
+   * Get the recovery summary for the host
+   * @return
+   */
+  public String getRecoverySummary() {
+    return recoverySummary;
+  }
+
+  /**
+   * Set the recovery summary for the host
+   * @return
+   */
+  public void setRecoverySummary(String recoverySummary) {
+    this.recoverySummary = recoverySummary;
+  }
+
+  /**
+   * Get the detailed recovery report
+   * @return
+   */
+  public RecoveryReport getRecoveryReport() {
+    return recoveryReport;
+  }
+
+  /**
+   * Set the detailed recovery report
+   * @param recoveryReport
+   */
+  public void setRecoveryReport(RecoveryReport recoveryReport) {
+    this.recoveryReport = recoveryReport;
+  }
 }

+ 9 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java

@@ -110,9 +110,13 @@ public class HostResourceProvider extends BaseBlueprintProcessor {
       PropertyHelper.getPropertyId("Hosts", "host_status");
   protected static final String HOST_MAINTENANCE_STATE_PROPERTY_ID = 
       PropertyHelper.getPropertyId("Hosts", "maintenance_state");
-  
+
   protected static final String HOST_HOST_HEALTH_REPORT_PROPERTY_ID =
       PropertyHelper.getPropertyId("Hosts", "host_health_report");
+  protected static final String HOST_RECOVERY_REPORT_PROPERTY_ID =
+      PropertyHelper.getPropertyId("Hosts", "recovery_report");
+  protected static final String HOST_RECOVERY_SUMMARY_PROPERTY_ID =
+      PropertyHelper.getPropertyId("Hosts", "recovery_summary");
   protected static final String HOST_STATE_PROPERTY_ID =
       PropertyHelper.getPropertyId("Hosts", "host_state");
   protected static final String HOST_LAST_AGENT_ENV_PROPERTY_ID =
@@ -251,6 +255,10 @@ public class HostResourceProvider extends BaseBlueprintProcessor {
           response.getStatus(),requestedIds);
       setResourceProperty(resource, HOST_HOST_HEALTH_REPORT_PROPERTY_ID,
           response.getHealthStatus().getHealthReport(), requestedIds);
+      setResourceProperty(resource, HOST_RECOVERY_REPORT_PROPERTY_ID,
+          response.getRecoveryReport(), requestedIds);
+      setResourceProperty(resource, HOST_RECOVERY_SUMMARY_PROPERTY_ID,
+          response.getRecoverySummary(), requestedIds);
       setResourceProperty(resource, HOST_DISK_INFO_PROPERTY_ID,
           response.getDisksInfo(), requestedIds);
       setResourceProperty(resource, HOST_STATE_PROPERTY_ID,

+ 2 - 2
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/URLStreamProvider.java

@@ -193,13 +193,13 @@ public class URLStreamProvider implements StreamProvider {
         return connection;
       } else {
         // no supported authentication type found
-        // we would let the original response propogate
+        // we would let the original response propagate
         LOG.error("Unsupported WWW-Authentication header:" + wwwAuthHeader+ ", for URL:" + spec);
         return connection;
       }
     } else {
         // not a 401 Unauthorized status code
-        // we would let the original response propogate
+        // we would let the original response propagate
         return connection;
     }
   }

+ 12 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Host.java

@@ -25,6 +25,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.AgentEnv;
 import org.apache.ambari.server.agent.DiskInfo;
 import org.apache.ambari.server.agent.HostInfo;
+import org.apache.ambari.server.agent.RecoveryReport;
 import org.apache.ambari.server.controller.HostResponse;
 import org.apache.ambari.server.orm.entities.HostVersionEntity;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
@@ -192,6 +193,17 @@ public interface Host {
    */
   public HostHealthStatus getHealthStatus();
 
+  /**
+   * Get detailed recovery report for the host
+   * @return
+   */
+  public RecoveryReport getRecoveryReport();
+
+  /**
+   * Set detailed recovery report for the host
+   */
+  public void setRecoveryReport(RecoveryReport recoveryReport);
+
   /**
    * @param healthStatus the healthStatus to set
    */

+ 24 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java

@@ -30,6 +30,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.AgentEnv;
 import org.apache.ambari.server.agent.DiskInfo;
 import org.apache.ambari.server.agent.HostInfo;
+import org.apache.ambari.server.agent.RecoveryReport;
 import org.apache.ambari.server.controller.HostResponse;
 import org.apache.ambari.server.events.MaintenanceModeEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
@@ -119,6 +120,7 @@ public class HostImpl implements Host {
   private long lastHeartbeatTime = 0L;
   private AgentEnv lastAgentEnv = null;
   private List<DiskInfo> disksInfo = new ArrayList<DiskInfo>();
+  private RecoveryReport recoveryReport = new RecoveryReport();
   private boolean persisted = false;
   private Integer currentPingPort = null;
 
@@ -853,6 +855,26 @@ public class HostImpl implements Host {
     }
   }
 
+  @Override
+  public RecoveryReport getRecoveryReport() {
+    try {
+      readLock.lock();
+      return recoveryReport;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void setRecoveryReport(RecoveryReport recoveryReport) {
+    try {
+      writeLock.lock();
+      this.recoveryReport = recoveryReport;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   @Override
   public HostHealthStatus getHealthStatus() {
     try {
@@ -1072,6 +1094,8 @@ public class HostImpl implements Host {
       r.setPublicHostName(getPublicHostName());
       r.setHostState(getState().toString());
       r.setStatus(getStatus());
+      r.setRecoveryReport(getRecoveryReport());
+      r.setRecoverySummary(getRecoveryReport().getSummary());
 
       return r;
     }

+ 2 - 0
ambari-server/src/main/resources/properties.json

@@ -47,6 +47,8 @@
         "Hosts/host_state",
         "Hosts/maintenance_state",
         "Hosts/desired_configs",
+        "Hosts/recovery_report",
+        "Hosts/recovery_summary",
         "_"
     ],
     "Component":[

+ 136 - 1
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -150,7 +150,19 @@ public class TestHeartbeatHandler {
 
   @Before
   public void setup() throws Exception {
-    injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    InMemoryDefaultTestModule module = new InMemoryDefaultTestModule(){
+
+      @Override
+      protected void configure() {
+        getProperties().put("recovery.type", "FULL");
+        getProperties().put("recovery.lifetime_max_count", "10");
+        getProperties().put("recovery.max_count", "4");
+        getProperties().put("recovery.window_in_minutes", "23");
+        getProperties().put("recovery.retry_interval", "2");
+        super.configure();
+      }
+    };
+    injector = Guice.createInjector(module);
     injector.getInstance(GuiceJpaInitializer.class);
     clusters = injector.getInstance(Clusters.class);
     injector.injectMembers(this);
@@ -834,6 +846,44 @@ public class TestHeartbeatHandler {
         hostObject.getLastRegistrationTime());
   }
 
+  @Test
+  public void testRegistrationRecoveryConfig() throws AmbariException,
+      InvalidStateTransitionException {
+    ActionManager am = getMockActionManager();
+    replay(am);
+    Clusters fsm = clusters;
+    HeartBeatHandler handler = new HeartBeatHandler(fsm, new ActionQueue(), am,
+                                                    injector);
+    clusters.addHost(DummyHostname1);
+    Host hostObject = clusters.getHost(DummyHostname1);
+    hostObject.setIPv4("ipv4");
+    hostObject.setIPv6("ipv6");
+
+    Register reg = new Register();
+    HostInfo hi = new HostInfo();
+    hi.setHostName(DummyHostname1);
+    hi.setOS(DummyOsType);
+    reg.setHostname(DummyHostname1);
+    reg.setCurrentPingPort(DummyCurrentPingPort);
+    reg.setHardwareProfile(hi);
+    reg.setAgentVersion(metaInfo.getServerVersion());
+    reg.setPrefix(Configuration.PREFIX_DIR);
+    RegistrationResponse rr = handler.handleRegistration(reg);
+    RecoveryConfig rc = rr.getRecoveryConfig();
+    assertEquals(rc.getMaxCount(), "4");
+    assertEquals(rc.getType(), "FULL");
+    assertEquals(rc.getMaxLifetimeCount(), "10");
+    assertEquals(rc.getRetryGap(), "2");
+    assertEquals(rc.getWindowInMinutes(), "23");
+
+    rc = RecoveryConfig.getRecoveryConfig(new Configuration());
+    assertEquals(rc.getMaxCount(), "6");
+    assertEquals(rc.getType(), "DEFAULT");
+    assertEquals(rc.getMaxLifetimeCount(), "12");
+    assertEquals(rc.getRetryGap(), "5");
+    assertEquals(rc.getWindowInMinutes(), "60");
+  }
+
   @Test
   public void testRegistrationWithBadVersion() throws AmbariException,
       InvalidStateTransitionException {
@@ -1842,6 +1892,91 @@ public class TestHeartbeatHandler {
             State.INSTALL_FAILED, serviceComponentHost2.getState());
   }
 
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveryStatusReports() throws Exception {
+    Clusters fsm = clusters;
+
+    Cluster cluster = getDummyCluster();
+    Host hostObject = clusters.getHost(DummyHostname1);
+    clusters.mapHostToCluster(hostObject.getHostName(), cluster.getClusterName());
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
+    hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setState(State.STARTED);
+
+    ActionQueue aq = new ActionQueue();
+
+    final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
+                                                        Role.DATANODE, null, null);
+    ActionManager am = getMockActionManager();
+    expect(am.getTasks(anyObject(List.class))).andReturn(
+        new ArrayList<HostRoleCommand>() {{
+          add(command);
+          add(command);
+        }}).anyTimes();
+    replay(am);
+    HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am, injector);
+
+    Register reg = new Register();
+    HostInfo hi = new HostInfo();
+    hi.setHostName(DummyHostname1);
+    hi.setOS(DummyOs);
+    hi.setOSRelease(DummyOSRelease);
+    reg.setHostname(DummyHostname1);
+    reg.setHardwareProfile(hi);
+    reg.setAgentVersion(metaInfo.getServerVersion());
+    handler.handleRegistration(reg);
+
+    hostObject.setState(HostState.UNHEALTHY);
+
+    aq.enqueue(DummyHostname1, new StatusCommand());
+
+    //All components are up
+    HeartBeat hb1 = new HeartBeat();
+    hb1.setResponseId(0);
+    hb1.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb1.setHostname(DummyHostname1);
+    RecoveryReport rr = new RecoveryReport();
+    rr.setSummary("RECOVERABLE");
+    List<ComponentRecoveryReport> compRecReports = new ArrayList<ComponentRecoveryReport>();
+    ComponentRecoveryReport compRecReport = new ComponentRecoveryReport();
+    compRecReport.setLimitReached(Boolean.FALSE);
+    compRecReport.setName("DATANODE");
+    compRecReport.setNumAttempts(2);
+    compRecReports.add(compRecReport);
+    rr.setComponentReports(compRecReports);
+    hb1.setRecoveryReport(rr);
+    handler.handleHeartBeat(hb1);
+    assertEquals("RECOVERABLE", hostObject.getRecoveryReport().getSummary());
+    assertEquals(1, hostObject.getRecoveryReport().getComponentReports().size());
+    assertEquals(2, hostObject.getRecoveryReport().getComponentReports().get(0).getNumAttempts());
+
+    HeartBeat hb2 = new HeartBeat();
+    hb2.setResponseId(1);
+    hb2.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb2.setHostname(DummyHostname1);
+    rr = new RecoveryReport();
+    rr.setSummary("UNRECOVERABLE");
+    compRecReports = new ArrayList<ComponentRecoveryReport>();
+    compRecReport = new ComponentRecoveryReport();
+    compRecReport.setLimitReached(Boolean.TRUE);
+    compRecReport.setName("DATANODE");
+    compRecReport.setNumAttempts(5);
+    compRecReports.add(compRecReport);
+    rr.setComponentReports(compRecReports);
+    hb2.setRecoveryReport(rr);
+    handler.handleHeartBeat(hb2);
+    assertEquals("UNRECOVERABLE", hostObject.getRecoveryReport().getSummary());
+    assertEquals(1, hostObject.getRecoveryReport().getComponentReports().size());
+    assertEquals(5, hostObject.getRecoveryReport().getComponentReports().get(0).getNumAttempts());
+  }
+
   @Test
   @SuppressWarnings("unchecked")
   public void testProcessStatusReports() throws Exception {

+ 45 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java

@@ -989,6 +989,51 @@ public class AmbariManagementControllerTest {
 
   }
 
+  @Test
+  public void testGetExecutionCommand() throws Exception {
+    testCreateServiceComponentHostSimple();
+
+    String clusterName = "foo1";
+    String serviceName = "HDFS";
+
+    Cluster cluster = clusters.getCluster(clusterName);
+    Service s1 = cluster.getService(serviceName);
+
+    // Create and attach config
+    Map<String, String> configs = new HashMap<String, String>();
+    configs.put("a", "b");
+
+    ConfigurationRequest cr1,cr2;
+    cr1 = new ConfigurationRequest(clusterName, "core-site","version1",
+                                   configs, null);
+    cr2 = new ConfigurationRequest(clusterName, "hdfs-site","version1",
+                                   configs, null);
+
+    ClusterRequest crReq = new ClusterRequest(cluster.getClusterId(), clusterName, null, null);
+    crReq.setDesiredConfig(Collections.singletonList(cr1));
+    controller.updateClusters(Collections.singleton(crReq), null);
+    crReq = new ClusterRequest(cluster.getClusterId(), clusterName, null, null);
+    crReq.setDesiredConfig(Collections.singletonList(cr2));
+    controller.updateClusters(Collections.singleton(crReq), null);
+
+    // Install
+    installService(clusterName, serviceName, false, false);
+    ExecutionCommand ec =
+        controller.getExecutionCommand(cluster,
+                                       s1.getServiceComponent("NAMENODE").getServiceComponentHost("h1"),
+                                       RoleCommand.START);
+    assertEquals("1-0", ec.getCommandId());
+    assertEquals("foo1", ec.getClusterName());
+    Map<String, Map<String, String>> configurations = ec.getConfigurations();
+    assertNotNull(configurations);
+    assertEquals(2, configurations.size());
+    assertTrue(configurations.containsKey("hdfs-site"));
+    assertTrue(configurations.containsKey("core-site"));
+    assertTrue(ec.getConfigurationAttributes().containsKey("hdfs-site"));
+    assertTrue(ec.getConfigurationAttributes().containsKey("core-site"));
+    Map<String, Set<String>> chInfo = ec.getClusterHostInfo();
+    assertTrue(chInfo.containsKey("namenode_host"));
+  }
 
   @Test
   public void testCreateServiceComponentWithConfigs() {

+ 111 - 1
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/HostResourceProviderTest.java

@@ -22,6 +22,8 @@ import com.google.gson.Gson;
 import com.google.inject.Injector;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.HostNotFoundException;
+import org.apache.ambari.server.agent.ComponentRecoveryReport;
+import org.apache.ambari.server.agent.RecoveryReport;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
@@ -57,6 +59,7 @@ import static org.powermock.api.easymock.PowerMock.replayAll;
 import java.net.InetAddress;
 import static org.powermock.api.easymock.PowerMock.*;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -643,6 +646,114 @@ public class HostResourceProviderTest {
         healthStatus, ambariMetaInfo, resourceProviderFactory, hostResourceProvider);
   }
 
+  @Test
+  public void testGetRecoveryReport() throws Exception {
+    Resource.Type type = Resource.Type.Host;
+
+    AmbariManagementController managementController = createMock(AmbariManagementController.class);
+    Clusters clusters = createNiceMock(Clusters.class);
+    Cluster cluster = createNiceMock(Cluster.class);
+    HostHealthStatus healthStatus = createNiceMock(HostHealthStatus.class);
+    AmbariMetaInfo ambariMetaInfo = createNiceMock(AmbariMetaInfo.class);
+    StackId stackId = createNiceMock(StackId.class);
+    ComponentInfo componentInfo = createNiceMock(ComponentInfo.class);
+    HostResponse hostResponse1 = createNiceMock(HostResponse.class);
+    ResourceProviderFactory resourceProviderFactory = createNiceMock(ResourceProviderFactory.class);
+    ResourceProvider hostResourceProvider = createNiceMock(HostResourceProvider.class);
+
+    RecoveryReport rr = new RecoveryReport();
+    rr.setSummary("RECOVERABLE");
+    List<ComponentRecoveryReport> compRecReports = new ArrayList<ComponentRecoveryReport>();
+    ComponentRecoveryReport compRecReport = new ComponentRecoveryReport();
+    compRecReport.setLimitReached(Boolean.FALSE);
+    compRecReport.setName("DATANODE");
+    compRecReport.setNumAttempts(2);
+    compRecReports.add(compRecReport);
+    rr.setComponentReports(compRecReports);
+
+    AbstractControllerResourceProvider.init(resourceProviderFactory);
+
+    Set<Cluster> clusterSet = new HashSet<Cluster>();
+    clusterSet.add(cluster);
+
+    ServiceComponentHostResponse shr1 = new ServiceComponentHostResponse("Cluster100", "Service100", "Component100",
+                                                                         "Host100", "STARTED", "", null, null, null);
+
+    Set<ServiceComponentHostResponse> responses = new HashSet<ServiceComponentHostResponse>();
+    responses.add(shr1);
+
+    // set expectations
+    expect(managementController.getClusters()).andReturn(clusters).anyTimes();
+    expect(managementController.getAmbariMetaInfo()).andReturn(ambariMetaInfo).anyTimes();
+    expect(managementController.getHostComponents((Set<ServiceComponentHostRequest>) anyObject())).andReturn(responses).anyTimes();
+    expect(clusters.getCluster("Cluster100")).andReturn(cluster).anyTimes();
+    expect(clusters.getClustersForHost("Host100")).andReturn(clusterSet).anyTimes();
+    expect(hostResponse1.getClusterName()).andReturn("Cluster100").anyTimes();
+    expect(hostResponse1.getHostname()).andReturn("Host100").anyTimes();
+    expect(hostResponse1.getRecoveryReport()).andReturn(rr).anyTimes();
+    expect(hostResponse1.getRecoverySummary()).andReturn(rr.getSummary()).anyTimes();
+    expect(ambariMetaInfo.getComponent((String) anyObject(), (String) anyObject(),
+                                       (String) anyObject(), (String) anyObject())).andReturn(componentInfo).anyTimes();
+    expect(componentInfo.getCategory()).andReturn("SLAVE").anyTimes();
+    expect(resourceProviderFactory.getHostResourceProvider(anyObject(Set.class), anyObject(Map.class),
+                                                           eq(managementController))).andReturn(hostResourceProvider).anyTimes();
+
+
+    Set<String> propertyIds = new HashSet<String>();
+
+    propertyIds.add(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID);
+    propertyIds.add(HostResourceProvider.HOST_NAME_PROPERTY_ID);
+    propertyIds.add(HostResourceProvider.HOST_RECOVERY_REPORT_PROPERTY_ID);
+    propertyIds.add(HostResourceProvider.HOST_RECOVERY_SUMMARY_PROPERTY_ID);
+
+    Predicate predicate =
+        new PredicateBuilder().property(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID).equals("Cluster100").
+            toPredicate();
+    Request request = PropertyHelper.getReadRequest(propertyIds);
+
+    Set<Resource> hostsResources = new HashSet<Resource>();
+
+    Resource hostResource1 = new ResourceImpl(Resource.Type.Host);
+    hostResource1.setProperty(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID, "Cluster100");
+    hostResource1.setProperty(HostResourceProvider.HOST_RECOVERY_SUMMARY_PROPERTY_ID, rr.getSummary());
+    hostResource1.setProperty(HostResourceProvider.HOST_RECOVERY_REPORT_PROPERTY_ID, rr);
+    hostsResources.add(hostResource1);
+
+    expect(hostResourceProvider.getResources(eq(request), eq(predicate))).andReturn(hostsResources).anyTimes();
+
+
+    // replay
+    replay(managementController, clusters, cluster,
+           hostResponse1, stackId, componentInfo,
+           healthStatus, ambariMetaInfo, resourceProviderFactory, hostResourceProvider);
+
+    ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
+        type,
+        PropertyHelper.getPropertyIds(type),
+        PropertyHelper.getKeyPropertyIds(type),
+        managementController);
+
+
+    Set<Resource> resources = provider.getResources(request, predicate);
+
+    Assert.assertEquals(1, resources.size());
+    for (Resource resource : resources) {
+      String clusterName = (String) resource.getPropertyValue(HostResourceProvider.HOST_CLUSTER_NAME_PROPERTY_ID);
+      Assert.assertEquals("Cluster100", clusterName);
+      String recovery = (String) resource.getPropertyValue(HostResourceProvider.HOST_RECOVERY_SUMMARY_PROPERTY_ID);
+      Assert.assertEquals("RECOVERABLE", recovery);
+      RecoveryReport recRep = (RecoveryReport)resource.getPropertyValue(HostResourceProvider.HOST_RECOVERY_REPORT_PROPERTY_ID);
+      Assert.assertEquals("RECOVERABLE", recRep.getSummary());
+      Assert.assertEquals(1, recRep.getComponentReports().size());
+      Assert.assertEquals(2, recRep.getComponentReports().get(0).getNumAttempts());
+    }
+
+    // verify
+    verify(managementController, clusters, cluster,
+           hostResponse1, stackId, componentInfo,
+           healthStatus, ambariMetaInfo, resourceProviderFactory, hostResourceProvider);
+  }
+
   @Test
   public void testGetResources_Status_Alert() throws Exception {
     Resource.Type type = Resource.Type.Host;
@@ -657,7 +768,6 @@ public class HostResourceProviderTest {
     HostResponse hostResponse1 = createNiceMock(HostResponse.class);
     ResourceProviderFactory resourceProviderFactory = createNiceMock(ResourceProviderFactory.class);
     ResourceProvider hostResourceProvider = createNiceMock(HostResourceProvider.class);
-    
     AbstractControllerResourceProvider.init(resourceProviderFactory);
 
     Set<Cluster> clusterSet = new HashSet<Cluster>();