浏览代码

AMBARI-15474: Listen for changes to auto-start configuration and send them to the agent during heartbeats.

Nahappan Somasundaram 9 年之前
父节点
当前提交
340f7ce6b6
共有 19 个文件被更改,包括 846 次插入109 次删除
  1. 4 0
      ambari-agent/src/main/python/ambari_agent/Controller.py
  2. 7 4
      ambari-agent/src/main/python/ambari_agent/Heartbeat.py
  3. 23 5
      ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
  4. 19 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
  5. 30 7
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
  6. 24 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
  7. 22 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
  8. 254 83
      ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
  9. 11 1
      ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
  10. 88 0
      ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigChangedEvent.java
  11. 11 1
      ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentInstalledEvent.java
  12. 90 0
      ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentRecoveryChangedEvent.java
  13. 11 1
      ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentUninstalledEvent.java
  14. 14 2
      ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java
  15. 10 0
      ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java
  16. 4 2
      ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
  17. 15 0
      ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
  18. 208 2
      ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
  19. 1 1
      ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java

+ 4 - 0
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -324,6 +324,10 @@ class Controller(threading.Thread):
         if retry:
           logger.info("Reconnected to %s", self.heartbeatUrl)
 
+        if "recoveryConfig" in response:
+          # update the list of components enabled for recovery
+          self.recovery_manager.update_configuration_from_registration(response)
+
         retry = False
         certVerifFailed = False
         self.DEBUG_SUCCESSFULL_HEARTBEATS += 1

+ 7 - 4
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -44,7 +44,7 @@ class Heartbeat:
     global clusterId, clusterDefinitionRevision, firstContact
     timestamp = int(time.time()*1000)
     queueResult = self.actionQueue.result()
-
+    recovery_timestamp = self.actionQueue.controller.recovery_manager.recovery_timestamp
 
     nodeStatus = { "status" : "HEALTHY",
                    "cause" : "NONE" }
@@ -52,7 +52,8 @@ class Heartbeat:
     heartbeat = { 'responseId'        : int(id),
                   'timestamp'         : timestamp,
                   'hostname'          : hostname(self.config),
-                  'nodeStatus'        : nodeStatus
+                  'nodeStatus'        : nodeStatus,
+                  'recoveryTimestamp' : recovery_timestamp
                 }
 
     rec_status = self.actionQueue.controller.recovery_manager.get_recovery_status()
@@ -74,8 +75,10 @@ class Heartbeat:
     if int(id) == 0:
       componentsMapped = False
 
-    logger.info("Building Heartbeat: {responseId = %s, timestamp = %s, commandsInProgress = %s, componentsMapped = %s}",
-        str(id), str(timestamp), repr(commandsInProgress), repr(componentsMapped))
+    logger.info("Building Heartbeat: {responseId = %s, timestamp = %s, "
+                "commandsInProgress = %s, componentsMapped = %s,"
+                "recoveryTimestamp = %s}",
+        str(id), str(timestamp), repr(commandsInProgress), repr(componentsMapped), str(recovery_timestamp))
 
     if logger.isEnabledFor(logging.DEBUG):
       logger.debug("Heartbeat: %s", pformat(heartbeat))

+ 23 - 5
ambari-agent/src/main/python/ambari_agent/RecoveryManager.py

@@ -95,6 +95,7 @@ class RecoveryManager:
     self.__cache_lock = threading.RLock()
     self.active_command_count = 0
     self.paused = False
+    self.recovery_timestamp = -1
 
     if not os.path.exists(cache_dir):
       try:
@@ -106,7 +107,7 @@ class RecoveryManager:
 
     self.actions = {}
 
-    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, "")
+    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, "", -1)
 
     pass
 
@@ -539,7 +540,8 @@ class RecoveryManager:
       "maxCount" : 10,
       "windowInMinutes" : 60,
       "retryGap" : 0,
-      "components" : "a,b"
+      "components" : "a,b",
+      "recoveryTimestamp" : 1458150424380
       }
     """
 
@@ -550,6 +552,7 @@ class RecoveryManager:
     retry_gap = 5
     max_lifetime_count = 12
     enabled_components = ""
+    recovery_timestamp = -1 # Default value if recoveryTimestamp is not available.
 
 
     if reg_resp and "recoveryConfig" in reg_resp:
@@ -572,13 +575,27 @@ class RecoveryManager:
       if 'components' in config:
         enabled_components = config['components']
 
+      if 'recoveryTimestamp' in config:
+        recovery_timestamp = config['recoveryTimestamp']
+
     self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
-                       enabled_components)
+                       enabled_components, recovery_timestamp)
     pass
 
-
+  """
+  Update recovery configuration with the specified values.
+
+  max_count - Configured maximum count of recovery attempt allowed per host component in a window.
+  window_in_min - Configured window size in minutes.
+  retry_gap - Configured retry gap between tries per host component
+  max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
+  recovery_enabled - True or False. Indicates whether recovery is enabled or not.
+  auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
+  enabled_components - CSV of componenents enabled for auto start.
+  recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up.
+  """
   def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
-                    auto_start_only, enabled_components):
+                    auto_start_only, enabled_components, recovery_timestamp):
     """
     Update recovery configuration, recovery is disabled if configuration values
     are not correct
@@ -610,6 +627,7 @@ class RecoveryManager:
     self.auto_start_only = auto_start_only
     self.max_lifetime_count = max_lifetime_count
     self.enabled_components = []
+    self.recovery_timestamp = recovery_timestamp
 
     self.allowed_desired_states = [self.STARTED, self.INSTALLED]
     self.allowed_current_states = [self.INIT, self.INSTALLED, self.STARTED]

+ 19 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java

@@ -42,6 +42,7 @@ public class HeartBeat {
   private AgentEnv agentEnv = null;
   private List<Alert> alerts = null;
   private RecoveryReport recoveryReport;
+  private long recoveryTimestamp = -1;
 
   public long getResponseId() {
     return responseId;
@@ -67,6 +68,24 @@ public class HeartBeat {
     this.hostname = hostname;
   }
 
+  /**
+   * Timestamp when the recovery values were last updated.
+   *
+   * @return - Timestamp.
+   */
+  public long getRecoveryTimestamp() {
+    return recoveryTimestamp;
+  }
+
+  /**
+   * Set the timestamp when the recovery values were last updated.
+   *
+   * @param recoveryTimestamp
+   */
+  public void setRecoveryTimestamp(long recoveryTimestamp) {
+    this.recoveryTimestamp = recoveryTimestamp;
+  }
+
   @JsonProperty("reports")
   public List<CommandReport> getReports() {
     return reports;

+ 30 - 7
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -269,6 +269,28 @@ public class HeartBeatHandler {
       return createRegisterCommand();
     }
 
+    /**
+     * A host can belong to only one cluster. Though getClustersForHost(hostname)
+     * returns a set of clusters, it will have only one entry.
+     *
+     *
+     * TODO: Handle the case when a host is a part of multiple clusters.
+     */
+    Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
+
+    if (clusters.size() > 0) {
+      String clusterName = clusters.iterator().next().getClusterName();
+
+      if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) {
+        RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
+        response.setRecoveryConfig(rc);
+
+        if (response.getRecoveryConfig() != null) {
+          LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString());
+        }
+      }
+    }
+
     heartbeatProcessor.addHeartbeat(heartbeat);
 
     // Send commands if node is active
@@ -467,19 +489,20 @@ public class HeartBeatHandler {
     /**
      * A host can belong to only one cluster. Though getClustersForHost(hostname)
      * returns a set of clusters, it will have only one entry.
+     *
+     * TODO: Handle the case when a host is a part of multiple clusters.
      */
-    String clusterName = null;
     Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
 
     if (clusters.size() > 0) {
-      clusterName = clusters.iterator().next().getClusterName();
-    }
+      String clusterName = clusters.iterator().next().getClusterName();
 
-    RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
-    response.setRecoveryConfig(rc);
+      RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname);
+      response.setRecoveryConfig(rc);
 
-    if(response.getRecoveryConfig() != null) {
-      LOG.info("Recovery configuration set to " + response.getRecoveryConfig().toString());
+      if(response.getRecoveryConfig() != null) {
+        LOG.info("Recovery configuration set to " + response.getRecoveryConfig().toString());
+      }
     }
 
     Long requestId = 0L;

+ 24 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java

@@ -69,6 +69,9 @@ public class HeartBeatResponse {
   @SerializedName("hasPendingTasks")
   private boolean hasPendingTasks = false;
 
+  @SerializedName("recoveryConfig")
+  private RecoveryConfig recoveryConfig;
+
   public long getResponseId() {
     return responseId;
   }
@@ -109,6 +112,26 @@ public class HeartBeatResponse {
     this.registrationCommand = registrationCommand;
   }
 
+  /**
+   * Get the recovery configuration settings for this host. The configuration is set whenever
+   * any value changes. The agent uses this information to update the values on it's side.
+   *
+   * @return Null if nothing changed since the last update, else updated configuration.
+   */
+  public RecoveryConfig getRecoveryConfig() {
+    return recoveryConfig;
+  }
+
+  /**
+   * Set the recovery configuration. Set only when one or more recovery values change. This
+   * is to avoid sending the configuration to the agent during every heartbeat.
+   *
+   * @param recoveryConfig
+   */
+  public void setRecoveryConfig(RecoveryConfig recoveryConfig) {
+    this.recoveryConfig = recoveryConfig;
+  }
+
   /**
    * Gets the alert definition commands that contain the alert definitions for
    * each cluster that the host is a member of.
@@ -197,6 +220,7 @@ public class HeartBeatResponse {
     buffer.append(", alertDefinitionCommands=").append(alertDefinitionCommands);
     buffer.append(", registrationCommand=").append(registrationCommand);
     buffer.append(", restartAgent=").append(restartAgent);
+    buffer.append(", recoveryConfig=").append(recoveryConfig);
     buffer.append('}');
     return buffer.toString();
   }

+ 22 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java

@@ -50,6 +50,9 @@ public class RecoveryConfig {
   @SerializedName("components")
   private String enabledComponents;
 
+  @SerializedName("recoveryTimestamp")
+  private long recoveryTimestamp;
+
   public String getEnabledComponents() {
     return enabledComponents;
   }
@@ -98,6 +101,24 @@ public class RecoveryConfig {
     this.maxLifetimeCount = maxLifetimeCount;
   }
 
+  /**
+   * Timestamp when the recovery values were last updated.
+   *
+   * @return - Timestamp.
+   */
+  public long getRecoveryTimestamp() {
+    return recoveryTimestamp;
+  }
+
+  /**
+   * Set the timestamp when the recovery values were last updated.
+   *
+   * @param recoveryTimestamp
+   */
+  public void setRecoveryTimestamp(long recoveryTimestamp) {
+    this.recoveryTimestamp = recoveryTimestamp;
+  }
+
   @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder("RecoveryConfig{");
@@ -107,6 +128,7 @@ public class RecoveryConfig {
     buffer.append(", retryGap=").append(retryGap);
     buffer.append(", maxLifetimeCount=").append(maxLifetimeCount);
     buffer.append(", components=").append(enabledComponents);
+    buffer.append(", recoveryTimestamp=").append(recoveryTimestamp);
     buffer.append('}');
     return buffer.toString();
   }

+ 254 - 83
ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java

@@ -18,13 +18,21 @@
 
 package org.apache.ambari.server.agent;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.events.ClusterConfigChangedEvent;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.ServiceComponentInstalledEvent;
+import org.apache.ambari.server.events.ServiceComponentRecoveryChangedEvent;
+import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.commons.lang.StringUtils;
@@ -33,10 +41,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Singleton
 public class RecoveryConfigHelper {
-
   /**
    * Recovery related configuration
    */
@@ -55,142 +63,305 @@ public class RecoveryConfigHelper {
   @Inject
   private Clusters clusters;
 
-  private Cluster cluster;
-  private Map<String, String> configProperties;
+  /**
+   * Cluster --> Host --> Timestamp
+   */
+  private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> timestampMap;
 
-  public RecoveryConfigHelper() {
+  @Inject
+  public RecoveryConfigHelper(AmbariEventPublisher eventPublisher) {
+    eventPublisher.register(this);
+    timestampMap = new ConcurrentHashMap<>();
   }
 
   public RecoveryConfig getDefaultRecoveryConfig()
       throws AmbariException {
-      return getRecoveryConfig(null, null);
+    return getRecoveryConfig(null, null);
   }
 
   public RecoveryConfig getRecoveryConfig(String clusterName, String hostname)
       throws AmbariException {
+    long now = System.currentTimeMillis();
 
     if (StringUtils.isNotEmpty(clusterName)) {
-      cluster = clusters.getCluster(clusterName);
-    }
-
-    configProperties = null;
+      // Insert or update timestamp for cluster::host
+      ConcurrentHashMap<String, Long>  hostTimestamp = timestampMap.get(clusterName);
+      if (hostTimestamp == null) {
+        hostTimestamp = new ConcurrentHashMap<>();
+        timestampMap.put(clusterName, hostTimestamp);
+      }
 
-    if (cluster != null) {
-      Config config = cluster.getDesiredConfigByType(getConfigType());
-      if (config != null) {
-        configProperties = config.getProperties();
+      if (StringUtils.isNotEmpty(hostname)) {
+        hostTimestamp.put(hostname, now);
       }
     }
 
-    if (configProperties == null) {
-      configProperties = new HashMap<>();
-    }
+    AutoStartConfig autoStartConfig = new AutoStartConfig(clusterName);
 
     RecoveryConfig recoveryConfig = new RecoveryConfig();
-    recoveryConfig.setMaxCount(getNodeRecoveryMaxCount());
-    recoveryConfig.setMaxLifetimeCount(getNodeRecoveryLifetimeMaxCount());
-    recoveryConfig.setRetryGap(getNodeRecoveryRetryGap());
-    recoveryConfig.setType(getNodeRecoveryType());
-    recoveryConfig.setWindowInMinutes(getNodeRecoveryWindowInMin());
-    if (isRecoveryEnabled()) {
-      recoveryConfig.setEnabledComponents(StringUtils.join(getEnabledComponents(hostname), ','));
+    recoveryConfig.setMaxCount(autoStartConfig.getNodeRecoveryMaxCount());
+    recoveryConfig.setMaxLifetimeCount(autoStartConfig.getNodeRecoveryLifetimeMaxCount());
+    recoveryConfig.setRetryGap(autoStartConfig.getNodeRecoveryRetryGap());
+    recoveryConfig.setType(autoStartConfig.getNodeRecoveryType());
+    recoveryConfig.setWindowInMinutes(autoStartConfig.getNodeRecoveryWindowInMin());
+    recoveryConfig.setRecoveryTimestamp(now);
+    if (autoStartConfig.isRecoveryEnabled()) {
+      recoveryConfig.setEnabledComponents(StringUtils.join(autoStartConfig.getEnabledComponents(hostname), ','));
     }
 
     return recoveryConfig;
   }
 
   /**
-   * Get a list of enabled components for the specified host and cluster. Filter by
-   * Maintenance Mode OFF, so that agent does not auto start components that are in
-   * maintenance mode.
+   * Computes if the recovery configuration was updated since the last time it was sent to the agent.
+   *
+   * @param clusterName - Name of the cluster which the host belongs to.
+   * @param hostname - Host name from agent.
+   * @param recoveryTimestamp - Time when the recovery configuration was last sent to the agent. Agent
+   *                          stores this value and sends it during each heartbeat. -1 if agent was
+   *                          restarted or configuration was not sent to the agent since it started.
    * @return
    */
-  private List<String> getEnabledComponents(String hostname) {
-    List<String> enabledComponents = new ArrayList<>();
-    List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(hostname);
-
-    for (ServiceComponentHost sch : scHosts) {
-      if (sch.isRecoveryEnabled()) {
-        // Keep the components that are not in maintenance mode.
-        if (sch.getMaintenanceState() == MaintenanceState.OFF) {
-          enabledComponents.add(sch.getServiceComponentName());
-        }
-      }
+  public boolean isConfigStale(String clusterName, String hostname, long recoveryTimestamp) {
+    // Look up the last updated timestamp for the clusterName-->hostname-->timestamp if
+    // it is available. If found, compare it with the timestamp from the agent. It the timestamp
+    // is different from the timestamp sent by the agent, the recovery config on the agent
+    // side is stale and should be sent to the agent during this heartbeat.
+
+    if (StringUtils.isEmpty(clusterName)) {
+      throw new IllegalArgumentException("clusterName cannot be empty or null.");
     }
 
-    return enabledComponents;
-  }
+    if (StringUtils.isEmpty(hostname)) {
+      throw new IllegalArgumentException("hostname cannot be empty or null.");
+    }
 
-  /**
-   * The configuration type name.
-   * @return
-   */
-  private String getConfigType() {
-    return "cluster-env";
+    ConcurrentHashMap<String, Long> hostTimestamp = timestampMap.get(clusterName);
+    if (hostTimestamp == null) {
+      return true;
+    }
+
+    Long timestamp = hostTimestamp.get(hostname);
+
+    if (timestamp.longValue() != recoveryTimestamp) {
+      return true;
+    }
+
+    return false;
   }
 
   /**
-   * Get a value indicating whether the cluster supports recovery.
-   *
-   * @return True or false.
+   * Maintenance mode of a service component host changed.
+   * @param event
+   * @throws AmbariException
    */
-  private boolean isRecoveryEnabled() {
-    return Boolean.parseBoolean(getProperty(RECOVERY_ENABLED_KEY, "false"));
+  @Subscribe
+  @AllowConcurrentEvents
+  public void handleMaintenanceModeEvent(MaintenanceModeEvent event)
+      throws AmbariException {
+    ServiceComponentHost sch = event.getServiceComponentHost();
+
+    if (sch != null && sch.isRecoveryEnabled()) {
+      invalidateRecoveryTimestamp(sch.getClusterName(), sch.getHostName());
+    }
   }
 
   /**
-   * Get the node recovery type. The only supported value is AUTO_START.
-   * @return
+   * A service component was installed on a host.
+   * @param event
+   * @throws AmbariException
    */
-  private String getNodeRecoveryType() {
-    return getProperty(RECOVERY_TYPE_KEY, RECOVERY_TYPE_DEFAULT);
+  @Subscribe
+  @AllowConcurrentEvents
+  public void handleServiceComponentInstalledEvent(ServiceComponentInstalledEvent event)
+      throws AmbariException {
+    if (event.isRecoveryEnabled()) {
+      Cluster cluster = clusters.getClusterById(event.getClusterId());
+
+      if (cluster != null) {
+        invalidateRecoveryTimestamp(cluster.getClusterName(), event.getHostName());
+      }
+    }
   }
 
   /**
-   * Get configured max count of recovery attempt allowed per host component in a window
-   * This is reset when agent is restarted.
-   * @return
+   * A service component was uninstalled from a host.
+   * @param event
+   * @throws AmbariException
    */
-  private String getNodeRecoveryMaxCount() {
-    return getProperty(RECOVERY_MAX_COUNT_KEY, RECOVERY_MAX_COUNT_DEFAULT);
+  @Subscribe
+  @AllowConcurrentEvents
+  public void handleServiceComponentUninstalledEvent(ServiceComponentUninstalledEvent event)
+      throws AmbariException {
+    if (event.isRecoveryEnabled()) {
+      Cluster cluster = clusters.getClusterById(event.getClusterId());
+
+      if (cluster != null) {
+        invalidateRecoveryTimestamp(cluster.getClusterName(), event.getHostName());
+      }
+    }
   }
 
   /**
-   * Get configured max lifetime count of recovery attempt allowed per host component.
-   * This is reset when agent is restarted.
-   * @return
+   * Recovery enabled was turned on or off.
+   * @param event
    */
-  private String getNodeRecoveryLifetimeMaxCount() {
-    return getProperty(RECOVERY_LIFETIME_MAX_COUNT_KEY, RECOVERY_LIFETIME_MAX_COUNT_DEFAULT);
+  @Subscribe
+  @AllowConcurrentEvents
+  public void handleServiceComponentRecoveryChangedEvent(ServiceComponentRecoveryChangedEvent event) {
+    invalidateRecoveryTimestamp(event.getClusterName(), null);
   }
 
   /**
-   * Get configured window size in minutes
-   * @return
+   * Cluster-env configuration changed.
+   * @param event
    */
-  private String getNodeRecoveryWindowInMin() {
-    return getProperty(RECOVERY_WINDOW_IN_MIN_KEY, RECOVERY_WINDOW_IN_MIN_DEFAULT);
+  @Subscribe
+  @AllowConcurrentEvents
+  public void handleClusterEnvConfigChangedEvent(ClusterConfigChangedEvent event) {
+    if (event.getConfigType() == ConfigHelper.CLUSTER_ENV) {
+      invalidateRecoveryTimestamp(event.getclusterName(), null);
+    }
   }
 
-  /**
-   * Get the configured retry gap between tries per host component
-   * @return
-   */
-  private String getNodeRecoveryRetryGap() {
-    return getProperty(RECOVERY_RETRY_GAP_KEY, RECOVERY_RETRY_GAP_DEFAULT);
+  private void invalidateRecoveryTimestamp(String clusterName, String hostname) {
+    if (StringUtils.isNotEmpty(clusterName)) {
+      ConcurrentHashMap<String, Long> hostTimestamp = timestampMap.get(clusterName);
+      if (hostTimestamp != null) {
+        if (StringUtils.isNotEmpty(hostname)) {
+          // Clear the time stamp for the specified host in this cluster
+          hostTimestamp.put(hostname, 0L);
+        }
+        else {
+          // Clear the time stamp for all hosts in this cluster
+          for(Map.Entry<String, Long> hostEntry : hostTimestamp.entrySet()) {
+            hostEntry.setValue(0L);
+          }
+        }
+      }
+    }
   }
 
   /**
-   * Get the property value for the specified key. If not present, return default value.
-   * @param key The key for which property value is required.
-   * @param defaultValue Default value to return if key is not found.
-   * @return
+   * Helper class to get auto start configuration
    */
-  private String getProperty(String key, String defaultValue) {
-    if (configProperties.containsKey(key)) {
-      return configProperties.get(key);
+  class AutoStartConfig {
+    private Cluster cluster;
+    private Map<String, String> configProperties;
+
+    public AutoStartConfig(String clusterName)
+        throws AmbariException {
+      if (StringUtils.isNotEmpty(clusterName)) {
+        cluster = clusters.getCluster(clusterName);
+      }
+
+      if (cluster != null) {
+        Config config = cluster.getDesiredConfigByType(getConfigType());
+        if (config != null) {
+          configProperties = config.getProperties();
+        }
+      }
+
+      if (configProperties == null) {
+        configProperties = new HashMap<>();
+      }
     }
+    /**
+     * Get a list of enabled components for the specified host and cluster. Filter by
+     * Maintenance Mode OFF, so that agent does not auto start components that are in
+     * maintenance mode.
+     * @return
+     */
+    private List<String> getEnabledComponents(String hostname) {
+      List<String> enabledComponents = new ArrayList<>();
+
+      if (cluster != null) {
+        List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(hostname);
+
+        for (ServiceComponentHost sch : scHosts) {
+          if (sch.isRecoveryEnabled()) {
+            // Keep the components that are not in maintenance mode.
+            if (sch.getMaintenanceState() == MaintenanceState.OFF) {
+              enabledComponents.add(sch.getServiceComponentName());
+            }
+          }
+        }
+      }
 
-    return defaultValue;
+      return enabledComponents;
+    }
+
+    /**
+     * The configuration type name.
+     * @return
+     */
+    private String getConfigType() {
+      return "cluster-env";
+    }
+
+    /**
+     * Get a value indicating whether the cluster supports recovery.
+     *
+     * @return True or false.
+     */
+    private boolean isRecoveryEnabled() {
+      return Boolean.parseBoolean(getProperty(RECOVERY_ENABLED_KEY, "false"));
+    }
+
+    /**
+     * Get the node recovery type. The only supported value is AUTO_START.
+     * @return
+     */
+    private String getNodeRecoveryType() {
+      return getProperty(RECOVERY_TYPE_KEY, RECOVERY_TYPE_DEFAULT);
+    }
+
+    /**
+     * Get configured max count of recovery attempt allowed per host component in a window
+     * This is reset when agent is restarted.
+     * @return
+     */
+    private String getNodeRecoveryMaxCount() {
+      return getProperty(RECOVERY_MAX_COUNT_KEY, RECOVERY_MAX_COUNT_DEFAULT);
+    }
+
+    /**
+     * Get configured max lifetime count of recovery attempt allowed per host component.
+     * This is reset when agent is restarted.
+     * @return
+     */
+    private String getNodeRecoveryLifetimeMaxCount() {
+      return getProperty(RECOVERY_LIFETIME_MAX_COUNT_KEY, RECOVERY_LIFETIME_MAX_COUNT_DEFAULT);
+    }
+
+    /**
+     * Get configured window size in minutes
+     * @return
+     */
+    private String getNodeRecoveryWindowInMin() {
+      return getProperty(RECOVERY_WINDOW_IN_MIN_KEY, RECOVERY_WINDOW_IN_MIN_DEFAULT);
+    }
+
+    /**
+     * Get the configured retry gap between tries per host component
+     * @return
+     */
+    private String getNodeRecoveryRetryGap() {
+      return getProperty(RECOVERY_RETRY_GAP_KEY, RECOVERY_RETRY_GAP_DEFAULT);
+    }
+
+    /**
+     * Get the property value for the specified key. If not present, return default value.
+     * @param key The key for which property value is required.
+     * @param defaultValue Default value to return if key is not found.
+     * @return
+     */
+    private String getProperty(String key, String defaultValue) {
+      if (configProperties.containsKey(key)) {
+        return configProperties.get(key);
+      }
+
+      return defaultValue;
+    }
   }
 }

+ 11 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java

@@ -105,7 +105,17 @@ public abstract class AmbariEvent {
     /**
      * The cluster was renamed.
      */
-    CLUSTER_RENAME;
+    CLUSTER_RENAME,
+
+    /**
+     * The service component recovery enabled field changed.
+     */
+    SERVICE_COMPONENT_RECOVERY_CHANGED,
+
+    /**
+     * Cluster configuration changed.
+     */
+    CLUSTER_CONFIG_CHANGED;
   }
 
   /**

+ 88 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigChangedEvent.java

@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+/**
+ * The {@link ClusterConfigChangedEvent} class is fired when a
+ * cluster configuration is successfully updated.
+ */
+public class ClusterConfigChangedEvent extends AmbariEvent {
+  private String m_clusterName;
+  private String m_configType;
+  private String m_versionTag;
+  private Long m_version;
+
+  public ClusterConfigChangedEvent(String clusterName, String configType, String versionTag, Long version) {
+    super(AmbariEventType.CLUSTER_CONFIG_CHANGED);
+    m_clusterName = clusterName;
+    m_configType = configType;
+    m_versionTag = versionTag;
+    m_version = version;
+  }
+
+  /**
+   * Get the cluster name
+   *
+   * @return
+   */
+  public String getclusterName() {
+    return m_clusterName;
+  }
+
+  /**
+   * Get the configuration type
+   *
+   * @return
+   */
+  public String getConfigType() {
+    return m_configType;
+  }
+
+  /**
+   * Get the version tag
+   *
+   * @return
+   */
+  public String getVersionTag() {
+    return m_versionTag;
+  }
+
+  /**
+   * Get the version
+   *
+   * @return
+   */
+  public Long getVersion() {
+    return m_version;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("ClusterEnvConfigChangedEvent{");
+    buffer.append("clusterName=").append(getclusterName());
+    buffer.append(", configType=").append(getConfigType());
+    buffer.append(", versionTag=").append(getVersionTag());
+    buffer.append(", version=").append(getVersion());
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

+ 11 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentInstalledEvent.java

@@ -24,6 +24,7 @@ package org.apache.ambari.server.events;
 public class ServiceComponentInstalledEvent extends ServiceEvent {
   private final String m_componentName;
   private final String m_hostName;
+  private final boolean m_recoveryEnabled;
 
   /**
    * Constructor.
@@ -37,13 +38,14 @@ public class ServiceComponentInstalledEvent extends ServiceEvent {
    */
   public ServiceComponentInstalledEvent(long clusterId, String stackName,
       String stackVersion, String serviceName, String componentName,
-      String hostName) {
+      String hostName, boolean recoveryEnabled) {
     super(AmbariEventType.SERVICE_COMPONENT_INSTALL_SUCCESS, clusterId,
         stackName,
         stackVersion, serviceName);
 
     m_componentName = componentName;
     m_hostName = hostName;
+    m_recoveryEnabled = recoveryEnabled;
   }
 
   public String getComponentName() {
@@ -54,6 +56,13 @@ public class ServiceComponentInstalledEvent extends ServiceEvent {
     return m_hostName;
   }
 
+  /**
+   * @return recovery enabled.
+   */
+  public boolean isRecoveryEnabled() {
+    return m_recoveryEnabled;
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -66,6 +75,7 @@ public class ServiceComponentInstalledEvent extends ServiceEvent {
     buffer.append(", serviceName=").append(m_serviceName);
     buffer.append(", componentName=").append(m_componentName);
     buffer.append(", hostName=").append(m_hostName);
+    buffer.append(", recoveryEnabled=").append(m_recoveryEnabled);
     buffer.append("}");
     return buffer.toString();
   }

+ 90 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentRecoveryChangedEvent.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+
+/**
+ * The {@link ServiceComponentRecoveryChangedEvent} class is fired when a service
+ * component is enabled or disabled for auto start.
+ */
+public class ServiceComponentRecoveryChangedEvent extends AmbariEvent {
+  private String m_clusterName;
+  private String m_serviceName;
+  private String m_componentName;
+  private boolean m_recoveryEnabled;
+
+  public ServiceComponentRecoveryChangedEvent(
+          String clusterName, String serviceName, String componentName, boolean recoveryEnabled) {
+    super(AmbariEventType.SERVICE_COMPONENT_RECOVERY_CHANGED);
+    m_clusterName = clusterName;
+    m_serviceName = serviceName;
+    m_componentName = componentName;
+    m_recoveryEnabled = recoveryEnabled;
+  }
+
+  /**
+   * Get the cluster name
+   *
+   * @return
+   */
+  public String getClusterName() {
+    return m_clusterName;
+  }
+
+  /**
+   * Get the service name
+   *
+   * @return
+   */
+  public String getServiceName() {
+    return m_serviceName;
+  }
+
+  /**
+   * Get the component name
+   *
+   * @return
+   */
+  public String getComponentName() {
+    return m_componentName;
+  }
+
+  /**
+   * Get recovery enabled
+   *
+   * @return
+   */
+  public boolean isRecoveryEnabled() {
+    return m_recoveryEnabled;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("ServiceComponentRecoveryChangeEvent{");
+    buffer.append("clusterName=").append(getClusterName());
+    buffer.append(", serviceName=").append(getServiceName());
+    buffer.append(", componentName=").append(getComponentName());
+    buffer.append(", recoveryEnabled=").append(isRecoveryEnabled());
+    buffer.append("}");
+    return buffer.toString();
+  }
+}

+ 11 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/ServiceComponentUninstalledEvent.java

@@ -24,6 +24,7 @@ package org.apache.ambari.server.events;
 public class ServiceComponentUninstalledEvent extends ServiceEvent {
   private final String m_componentName;
   private final String m_hostName;
+  private final boolean m_recoveryEnabled;
 
   /**
    * Constructor.
@@ -37,13 +38,14 @@ public class ServiceComponentUninstalledEvent extends ServiceEvent {
    */
   public ServiceComponentUninstalledEvent(long clusterId, String stackName,
       String stackVersion, String serviceName, String componentName,
-      String hostName) {
+      String hostName, boolean recoveryEnabled) {
     super(AmbariEventType.SERVICE_COMPONENT_UNINSTALLED_SUCCESS, clusterId,
         stackName,
         stackVersion, serviceName);
 
     m_componentName = componentName;
     m_hostName = hostName;
+    m_recoveryEnabled = recoveryEnabled;
   }
 
   /**
@@ -60,6 +62,13 @@ public class ServiceComponentUninstalledEvent extends ServiceEvent {
     return m_hostName;
   }
 
+  /**
+   * @return recovery enabled.
+   */
+  public boolean isRecoveryEnabled() {
+    return m_recoveryEnabled;
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -72,6 +81,7 @@ public class ServiceComponentUninstalledEvent extends ServiceEvent {
     buffer.append(", serviceName=").append(m_serviceName);
     buffer.append(", componentName=").append(m_componentName);
     buffer.append(", hostName=").append(m_hostName);
+    buffer.append(", recoveryEnabled=").append(m_recoveryEnabled);
     buffer.append("}");
     return buffer.toString();
   }

+ 14 - 2
ambari-server/src/main/java/org/apache/ambari/server/state/ConfigImpl.java

@@ -27,6 +27,8 @@ import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.ambari.server.events.ClusterConfigChangedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
 import org.apache.ambari.server.orm.entities.ClusterConfigEntity;
@@ -70,6 +72,9 @@ public class ConfigImpl implements Config {
   @Inject
   private ServiceConfigDAO serviceConfigDAO;
 
+  @Inject
+  private AmbariEventPublisher eventPublisher;
+
   @AssistedInject
   public ConfigImpl(@Assisted Cluster cluster, @Assisted String type, @Assisted Map<String, String> properties,
       @Assisted Map<String, Map<String, String>> propertiesAttributes, Injector injector) {
@@ -390,8 +395,8 @@ public class ConfigImpl implements Config {
           // if the configuration was found, then update it
           if (null != entity) {
             LOG.debug(
-                "Updating {} version {} with new configurations; a new version will not be created",
-                getType(), getVersion());
+                    "Updating {} version {} with new configurations; a new version will not be created",
+                    getType(), getVersion());
 
             entity.setData(gson.toJson(getProperties()));
 
@@ -399,6 +404,13 @@ public class ConfigImpl implements Config {
             // newest data
             clusterDAO.merge(clusterEntity, true);
             cluster.refresh();
+
+            // broadcast the change event for cluster-env config type
+            if (getType() == ConfigHelper.CLUSTER_ENV) {
+              ClusterConfigChangedEvent event = new ClusterConfigChangedEvent(
+                      cluster.getClusterName(), getType(), getTag(), getVersion());
+              eventPublisher.publish(event);
+            }
           }
         }
       } finally {

+ 10 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentImpl.java

@@ -29,6 +29,8 @@ import org.apache.ambari.server.ObjectNotFoundException;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.ServiceComponentResponse;
+import org.apache.ambari.server.events.ServiceComponentRecoveryChangedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
 import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.dao.ServiceComponentDesiredStateDAO;
@@ -74,6 +76,8 @@ public class ServiceComponentImpl implements ServiceComponent {
   private ServiceComponentHostFactory serviceComponentHostFactory;
   @Inject
   private AmbariMetaInfo ambariMetaInfo;
+  @Inject
+  private AmbariEventPublisher eventPublisher;
 
   ServiceComponentDesiredStateEntity desiredStateEntity;
   private Map<String, ServiceComponentHost> hostComponents;
@@ -224,6 +228,12 @@ public class ServiceComponentImpl implements ServiceComponent {
       if (desiredStateEntity != null) {
         desiredStateEntity.setRecoveryEnabled(recoveryEnabled);
         saveIfPersisted(desiredStateEntity);
+
+        // broadcast the change
+        ServiceComponentRecoveryChangedEvent event = new ServiceComponentRecoveryChangedEvent(
+                getClusterName(), getServiceName(), getName(), isRecoveryEnabled());
+        eventPublisher.publish(event);
+
       } else {
         LOG.warn("Setting a member on an entity object that may have been " +
                 "previously deleted, serviceName = " + service.getName());

+ 4 - 2
ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java

@@ -1448,7 +1448,8 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
 
           ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent(
               getClusterId(), stackId.getStackName(),
-              stackId.getStackVersion(), getServiceName(), getServiceComponentName(), getHostName());
+              stackId.getStackVersion(), getServiceName(), getServiceComponentName(), getHostName(),
+                  isRecoveryEnabled());
 
           eventPublisher.publish(event);
         } else {
@@ -1593,10 +1594,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
       String serviceName = getServiceName();
       String componentName = getServiceComponentName();
       String hostName = getHostName();
+      boolean recoveryEnabled = isRecoveryEnabled();
 
       ServiceComponentUninstalledEvent event = new ServiceComponentUninstalledEvent(
           clusterId, stackName, stackVersion, serviceName, componentName,
-          hostName);
+          hostName, recoveryEnabled);
 
       eventPublisher.publish(event);
     }

+ 15 - 0
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -107,6 +107,7 @@ import com.google.inject.persist.PersistService;
 import com.google.inject.persist.UnitOfWork;
 
 import junit.framework.Assert;
+import static org.junit.Assert.assertNull;
 
 public class TestHeartbeatHandler {
 
@@ -412,6 +413,20 @@ public class TestHeartbeatHandler {
     assertEquals(rc.getRetryGap(), "2");
     assertEquals(rc.getWindowInMinutes(), "23");
     assertEquals(rc.getEnabledComponents(), "DATANODE,NAMENODE");
+
+    // Send a heart beat with the recovery timestamp set to the
+    // recovery timestamp from registration. The heart beat
+    // response should not contain a recovery config since
+    // nothing changed between the registration and heart beat.
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb.setRecoveryTimestamp(rc.getRecoveryTimestamp());
+
+    HeartBeatResponse hbr = handler.handleHeartBeat(hb);
+    assertNull(hbr.getRecoveryConfig());
   }
 
   //

+ 208 - 2
ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java

@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.configuration;
 
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -26,9 +27,15 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.HeartbeatTestHelper;
 import org.apache.ambari.server.agent.RecoveryConfig;
 import org.apache.ambari.server.agent.RecoveryConfigHelper;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,9 +45,14 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DATANODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostname1;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test RecoveryConfigHelper class
@@ -56,12 +68,21 @@ public class RecoveryConfigHelperTest {
   @Inject
   private RecoveryConfigHelper recoveryConfigHelper;
 
+  @Inject
+  private AmbariEventPublisher eventPublisher;
+
   @Before
   public void setup() throws Exception {
     module = HeartbeatTestHelper.getTestModule();
     injector = Guice.createInjector(module);
     injector.getInstance(GuiceJpaInitializer.class);
     injector.injectMembers(this);
+
+    // Synchronize the publisher (AmbariEventPublisher) and subscriber (RecoveryConfigHelper),
+    // so that the events get handled as soon as they are published, allowing the tests to
+    // verify the methods under test.
+    EventBus synchronizedBus = EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
+    synchronizedBus.register(recoveryConfigHelper);
   }
 
   @After
@@ -74,7 +95,7 @@ public class RecoveryConfigHelperTest {
    */
   @Test
   public void testRecoveryConfigDefaultValues()
-      throws NoSuchFieldException, IllegalAccessException, AmbariException {
+      throws AmbariException {
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getDefaultRecoveryConfig();
     assertEquals(recoveryConfig.getMaxLifetimeCount(), RecoveryConfigHelper.RECOVERY_LIFETIME_MAX_COUNT_DEFAULT);
     assertEquals(recoveryConfig.getMaxCount(), RecoveryConfigHelper.RECOVERY_MAX_COUNT_DEFAULT);
@@ -90,7 +111,7 @@ public class RecoveryConfigHelperTest {
    */
   @Test
   public void testRecoveryConfigValues()
-      throws NoSuchFieldException, IllegalAccessException, AmbariException {
+      throws AmbariException {
     String hostname = "hostname1";
     Cluster cluster = getDummyCluster(hostname);
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), hostname);
@@ -102,6 +123,191 @@ public class RecoveryConfigHelperTest {
     assertNotNull(recoveryConfig.getEnabledComponents());
   }
 
+  /**
+   * Install a component with auto start enabled. Verify that the old config was invalidated.
+   *
+   * @throws AmbariException
+   */
+  @Test
+  public void testServiceComponentInstalled()
+      throws AmbariException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+
+    hdfs.addServiceComponent(DATANODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+    // Get the recovery configuration
+    RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+
+    // Install HDFS::NAMENODE to trigger a component installed event
+    hdfs.addServiceComponent(NAMENODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    // Verify that the config is stale now
+    boolean isConfigStale = recoveryConfigHelper.isConfigStale(cluster.getClusterName(), DummyHostname1,
+            recoveryConfig.getRecoveryTimestamp());
+
+    assertTrue(isConfigStale);
+
+    // Verify the new config
+    recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+  }
+
+  /**
+   * Uninstall a component and verify that the config is stale.
+   * 
+   * @throws AmbariException
+   */
+  @Test
+  public void testServiceComponentUninstalled()
+      throws AmbariException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+
+    hdfs.addServiceComponent(DATANODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+    hdfs.addServiceComponent(NAMENODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    // Get the recovery configuration
+    RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+
+    // Uninstall HDFS::DATANODE from host1
+    hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).delete();
+
+    // Verify that the config is stale
+    boolean isConfigStale = recoveryConfigHelper.isConfigStale(cluster.getClusterName(), DummyHostname1,
+            recoveryConfig.getRecoveryTimestamp());
+
+    assertTrue(isConfigStale);
+
+    // Verify the new config
+    recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE");
+  }
+
+  /**
+   * Disable cluster level auto start and verify that the config is stale.
+   *
+   * @throws AmbariException
+   */
+  @Test
+  public void testClusterEnvConfigChanged()
+      throws AmbariException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+
+    hdfs.addServiceComponent(DATANODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setDesiredState(State.INSTALLED);
+
+    // Get the recovery configuration
+    RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+
+    // Get cluser-env config and turn off recovery for the cluster
+    Config config = cluster.getDesiredConfigByType("cluster-env");
+
+    config.updateProperties(new HashMap<String, String>() {{
+      put(RecoveryConfigHelper.RECOVERY_ENABLED_KEY, "false");
+    }});
+    config.persist(false);
+
+    // Recovery config should be stale because of the above change.
+    boolean isConfigStale = recoveryConfigHelper.isConfigStale(cluster.getClusterName(), DummyHostname1,
+            recoveryConfig.getRecoveryTimestamp());
+
+    assertTrue(isConfigStale);
+
+    // Get the recovery configuration again and verify that there are no components to be auto started
+    recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertNull(recoveryConfig.getEnabledComponents());
+  }
+
+  /**
+   * Change the maintenance mode of a service component host and verify that config is stale.
+   *
+   * @throws AmbariException
+   */
+  @Test
+  public void testMaintenanceModeChanged()
+      throws AmbariException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+
+    hdfs.addServiceComponent(DATANODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+    hdfs.addServiceComponent(NAMENODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    // Get the recovery configuration
+    RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+
+    hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setMaintenanceState(MaintenanceState.ON);
+
+    // We need a new config
+    boolean isConfigStale = recoveryConfigHelper.isConfigStale(cluster.getClusterName(), DummyHostname1,
+            recoveryConfig.getRecoveryTimestamp());
+
+    assertTrue(isConfigStale);
+
+    // Only NAMENODE is left
+    recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE");
+  }
+
+  /**
+   * Disable recovery on a component and verify that the config is stale.
+   *
+   * @throws AmbariException
+   */
+  @Test
+  public void testServiceComponentRecoveryChanged()
+      throws AmbariException {
+    Cluster cluster = heartbeatTestHelper.getDummyCluster();
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+
+    hdfs.addServiceComponent(DATANODE).setRecoveryEnabled(true);
+    hdfs.getServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+
+    // Get the recovery configuration
+    RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+
+    // Turn off auto start for HDFS::DATANODE
+    hdfs.getServiceComponent(DATANODE).setRecoveryEnabled(false);
+
+    // Config should be stale now
+    boolean isConfigStale = recoveryConfigHelper.isConfigStale(cluster.getClusterName(), DummyHostname1,
+            recoveryConfig.getRecoveryTimestamp());
+
+    assertTrue(isConfigStale);
+
+    // Get the latest config. DATANODE should not be present.
+    recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
+    assertEquals(recoveryConfig.getEnabledComponents(), "");
+  }
+
   private Cluster getDummyCluster(final String hostname)
           throws AmbariException {
     Map<String, String> configProperties = new HashMap<String, String>() {{

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/HostVersionOutOfSyncListenerTest.java

@@ -487,7 +487,7 @@ public class HostVersionOutOfSyncListenerTest {
           .getServiceComponent(componentName), hostName));
       ServiceComponentInstalledEvent event = new ServiceComponentInstalledEvent(cl.getClusterId(),
           stackIdObj.getStackName(), stackIdObj.getStackVersion(),
-          serviceName, componentName, hostName);
+          serviceName, componentName, hostName, false /* recovery not enabled */);
       m_eventPublisher.publish(event);
     }
   }