فهرست منبع

AMBARI-7642 - Alerts: Maintenance Mode Should Prevent Alert Notifications (jonathanhurley)

Jonathan Hurley 10 سال پیش
والد
کامیت
3e54f95689
18فایلهای تغییر یافته به همراه1280 افزوده شده و 153 حذف شده
  1. 43 0
      ambari-server/src/main/java/org/apache/ambari/server/EagerSingleton.java
  2. 41 15
      ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
  3. 21 5
      ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java
  4. 6 1
      ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
  5. 175 0
      ambari-server/src/main/java/org/apache/ambari/server/events/MaintenanceModeEvent.java
  6. 2 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java
  7. 2 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java
  8. 199 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertMaintenanceModeListener.java
  9. 5 1
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
  10. 2 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java
  11. 11 0
      ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java
  12. 5 0
      ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
  13. 73 60
      ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java
  14. 13 0
      ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
  15. 188 0
      ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
  16. 99 0
      ambari-server/src/test/java/org/apache/ambari/server/events/MockEventListener.java
  17. 386 70
      ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
  18. 9 1
      ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java

+ 43 - 0
ambari-server/src/main/java/org/apache/ambari/server/EagerSingleton.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.Module;
+import com.google.inject.ScopeAnnotation;
+import com.google.inject.Singleton;
+import com.google.inject.binder.AnnotatedBindingBuilder;
+
+/**
+ * The {@link EagerSingleton} annotation is used to mark a {@link Singleton}
+ * class as being needed to be eagerly registered by a {@link Module}.
+ * <p/>
+ * Classes marked with this annotation should also be {@link Singleton}. They
+ * will be discovered on the classpath and then
+ * {@link AnnotatedBindingBuilder#asEagerSingleton()} will be invoked on them.
+ */
+@Target({ ElementType.TYPE })
+@Retention(RUNTIME)
+@ScopeAnnotation
+public @interface EagerSingleton {
+}

+ 41 - 15
ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java

@@ -39,6 +39,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.actionmanager.ActionDBAccessor;
 import org.apache.ambari.server.actionmanager.ActionDBAccessorImpl;
 import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
@@ -53,11 +54,6 @@ import org.apache.ambari.server.controller.internal.HostResourceProvider;
 import org.apache.ambari.server.controller.internal.MemberResourceProvider;
 import org.apache.ambari.server.controller.internal.ServiceResourceProvider;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
-import org.apache.ambari.server.events.listeners.AlertAggregateListener;
-import org.apache.ambari.server.events.listeners.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.AlertReceivedListener;
-import org.apache.ambari.server.events.listeners.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.DBAccessorImpl;
 import org.apache.ambari.server.orm.PersistenceType;
@@ -99,8 +95,14 @@ import org.eclipse.jetty.server.SessionIdManager;
 import org.eclipse.jetty.server.SessionManager;
 import org.eclipse.jetty.server.session.HashSessionIdManager;
 import org.eclipse.jetty.server.session.HashSessionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
+import org.springframework.core.type.filter.AnnotationTypeFilter;
 import org.springframework.security.crypto.password.PasswordEncoder;
 import org.springframework.security.crypto.password.StandardPasswordEncoder;
+import org.springframework.util.ClassUtils;
 import org.springframework.web.filter.DelegatingFilterProxy;
 
 import com.google.common.util.concurrent.ServiceManager;
@@ -117,6 +119,7 @@ import com.google.inject.persist.jpa.AmbariJpaPersistModule;
  * Used for injection purposes.
  */
 public class ControllerModule extends AbstractModule {
+  private static Logger LOG = LoggerFactory.getLogger(ControllerModule.class);
 
   private final Configuration configuration;
   private final HostsMap hostsMap;
@@ -317,9 +320,11 @@ public class ControllerModule extends AbstractModule {
    * instances and then register them with a singleton {@link ServiceManager}.
    */
   private void bindServices() {
-    Set<com.google.common.util.concurrent.Service> services = new HashSet<com.google.common.util.concurrent.Service>();
+    Set<com.google.common.util.concurrent.Service> services =
+        new HashSet<com.google.common.util.concurrent.Service>();
 
-    AlertNoticeDispatchService alertNoticeDispatchService = new AlertNoticeDispatchService();
+    AlertNoticeDispatchService alertNoticeDispatchService =
+        new AlertNoticeDispatchService();
 
     bind(AlertNoticeDispatchService.class).toInstance(
         alertNoticeDispatchService);
@@ -332,14 +337,35 @@ public class ControllerModule extends AbstractModule {
   /**
    * Initializes all eager singletons that should be instantiated as soon as
    * possible and not wait for injection.
+   * <p/>
+   * An example of where this is needed is with a singleton that is headless; in
+   * other words, it doesn't have any injections but still needs to be part of
+   * the Guice framework.
+   * <p/>
+   * This currently scans {@code org.apache.ambari.server} for any
+   * {@link EagerSingleton} instances.
    */
   private void bindEagerSingletons() {
-    // alert subscribers are "headless" and have no guice references; created
-    // them as eager singletons to have them register with the eventbus
-    bind(AlertReceivedListener.class).asEagerSingleton();
-    bind(AlertStateChangedListener.class).asEagerSingleton();
-    bind(AlertServiceStateListener.class).asEagerSingleton();
-    bind(AlertLifecycleListener.class).asEagerSingleton();
-    bind(AlertAggregateListener.class).asEagerSingleton();
+    ClassPathScanningCandidateComponentProvider scanner =
+        new ClassPathScanningCandidateComponentProvider(false);
+
+    // match only singletons that are eager listeners
+    scanner.addIncludeFilter(new AnnotationTypeFilter(EagerSingleton.class));
+
+    Set<BeanDefinition> beanDefinitions = scanner.findCandidateComponents("org.apache.ambari.server");
+
+    if (null == beanDefinitions || beanDefinitions.size() == 0) {
+      LOG.warn("No instances of {} found to register", EagerSingleton.class);
+      return;
+    }
+
+    for (BeanDefinition beanDefinition : beanDefinitions) {
+      String className = beanDefinition.getBeanClassName();
+      Class<?> clazz = ClassUtils.resolveClassName(className,
+          ClassUtils.getDefaultClassLoader());
+
+      bind(clazz).asEagerSingleton();
+      LOG.debug("Binding singleton {} eagerly", clazz);
+    }
   }
-}
+}

+ 21 - 5
ambari-server/src/main/java/org/apache/ambari/server/events/AlertStateChangeEvent.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.events;
 
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
 import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
@@ -33,9 +34,14 @@ public class AlertStateChangeEvent extends AlertEvent {
   private final AlertState m_fromState;
 
   /**
-   * The newly created historical entry.
+   * The current alert, including state and history.
    */
-  private final AlertHistoryEntity m_newEntity;
+  private final AlertCurrentEntity m_currentAlert;
+
+  /**
+   * The historical record for this alert state change event.
+   */
+  private final AlertHistoryEntity m_history;
 
   /**
    * Constructor.
@@ -44,20 +50,30 @@ public class AlertStateChangeEvent extends AlertEvent {
    * @param alert
    */
   public AlertStateChangeEvent(long clusterId, Alert alert,
-      AlertHistoryEntity newEntity, AlertState fromState) {
+      AlertCurrentEntity currentAlert, AlertState fromState) {
     super(clusterId, alert);
 
-    m_newEntity = newEntity;
+    m_currentAlert = currentAlert;
+    m_history = currentAlert.getAlertHistory();
     m_fromState = fromState;
   }
 
+  /**
+   * Gets the current alert.
+   *
+   * @return the current alert.
+   */
+  public AlertCurrentEntity getCurrentAlert() {
+    return m_currentAlert;
+  }
+
   /**
    * Gets the newly created item in alert history.
    *
    * @return the newly created historical item.
    */
   public AlertHistoryEntity getNewHistoricalEntry() {
-    return m_newEntity;
+    return m_history;
   }
 
   /**

+ 6 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java

@@ -40,7 +40,12 @@ public abstract class AmbariEvent {
     /**
      * An alert definition is removed from the system.
      */
-    ALERT_DEFINITION_REMOVAL;
+    ALERT_DEFINITION_REMOVAL,
+
+    /**
+     * A host/service/component has had a maintenance mode change.
+     */
+    MAINTENANCE_MODE;
   }
 
   /**

+ 175 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/MaintenanceModeEvent.java

@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
+
+/**
+ * The {@link MaintenanceModeEvent} is used to capture a host/service/component
+ * maintenance mode change and broadcast it to the system.
+ * <p/>
+ * Although there are several different states aside from on & off in
+ * {@link MaintenanceState}, this event should handle explicit on or off state
+ * changes. This event should not be used for implied state changes, such as
+ * when a component goes into implied maintenance mode as a result of its host's
+ * maintenance mode being explicitely invoked.
+ */
+public class MaintenanceModeEvent extends AmbariEvent {
+
+  /**
+   * The new maintenance state.
+   */
+  private final MaintenanceState m_state;
+
+  /**
+   * The service whose maintenance state changed, or {@code null}.
+   */
+  private final Service m_service;
+
+  /**
+   * The host whose maintenance state changed, or {@code null}.
+   */
+  private final Host m_host;
+
+  /**
+   * The component whose maintenace state changed, or {@code null}.
+   */
+  private final ServiceComponentHost m_serviceComponentHost;
+
+  /**
+   * Constructor.
+   *
+   * @param state
+   *          the new state (not {@code null}).
+   * @param service
+   *          the service that changed maintenance state (not {@code null})).
+   */
+  public MaintenanceModeEvent(MaintenanceState state, Service service) {
+    this(state, service, null, null);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param state
+   *          the new state (not {@code null}).
+   * @param host
+   *          the host that changed maintenance state (not {@code null})).
+   */
+  public MaintenanceModeEvent(MaintenanceState state, Host host) {
+    this(state, null, host, null);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param state
+   *          the new state (not {@code null}).
+   * @param serviceComponentHost
+   *          the component that changed maintenance state (not {@code null})).
+   */
+  public MaintenanceModeEvent(MaintenanceState state,
+      ServiceComponentHost serviceComponentHost) {
+    this(state, null, null, serviceComponentHost);
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param state
+   *          the new state (not {@code null}).
+   * @param service
+   *          the service that changed maintenance state, or {@code null}.
+   * @param host
+   *          the host that changed maintenance state, or {@code null}.
+   * @param serviceComponentHost
+   *          the component that changed maintenance state, or {@code null}.
+   */
+  private MaintenanceModeEvent(MaintenanceState state, Service service,
+      Host host, ServiceComponentHost serviceComponentHost) {
+    super(AmbariEventType.MAINTENANCE_MODE);
+    m_state = state;
+    m_service = service;
+    m_host = host;
+    m_serviceComponentHost = serviceComponentHost;
+  }
+
+  /**
+   * Gets the service that had the direct maintenance mode event, or
+   * {@code null} if the event was not directly on a service.
+   *
+   * @return the service or {@code null}
+   */
+  public Service getService() {
+    return m_service;
+  }
+
+  /**
+   * Gets the host that had the direct maintenance mode event, or {@code null}
+   * if the event was not directly on a host.
+   *
+   * @return the host or {@code null}
+   */
+  public Host getHost() {
+    return m_host;
+  }
+
+  /**
+   * Gets the component that had the direct maintenance mode event, or
+   * {@code null} if the event was not directly on a component.
+   *
+   * @return the component or {@code null}
+   */
+  public ServiceComponentHost getServiceComponentHost() {
+    return m_serviceComponentHost;
+  }
+
+  /**
+   * Gets the new maintenance state for the service/host/component.
+   *
+   * @return the new maintenance state (never {@code null}).
+   */
+  public MaintenanceState getMaintenanceState() {
+    return m_state;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString() {
+    Object object = null;
+    if (null != m_service) {
+      object = m_service;
+    } else if (null != m_host) {
+      object = m_host;
+    } else {
+      object = m_serviceComponentHost;
+    }
+
+    StringBuilder buffer = new StringBuilder("MaintenanceModeEvent{ ");
+    buffer.append("state=").append(m_state);
+    buffer.append(", object=").append(object);
+    buffer.append(" }");
+
+    return buffer.toString();
+  }
+}

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java

@@ -19,6 +19,7 @@ package org.apache.ambari.server.events.listeners;
 
 import java.text.MessageFormat;
 
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.orm.dao.AlertSummaryDTO;
@@ -41,6 +42,7 @@ import com.google.inject.Singleton;
  * {@link SourceType#AGGREGATE} alert that needs to run.
  */
 @Singleton
+@EagerSingleton
 public class AlertAggregateListener {
 
   @Inject

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertLifecycleListener.java

@@ -19,6 +19,7 @@ package org.apache.ambari.server.events.listeners;
 
 import java.util.Set;
 
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
@@ -40,6 +41,7 @@ import com.google.inject.Singleton;
  * infrastructure lifecycle such as definition registration events.
  */
 @Singleton
+@EagerSingleton
 public class AlertLifecycleListener {
   /**
    * Logger.

+ 199 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertMaintenanceModeListener.java

@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners;
+
+import java.util.List;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertMaintenanceModeListener} handles events that relate to
+ * Maintenance Mode changes.
+ */
+@Singleton
+@EagerSingleton
+public class AlertMaintenanceModeListener {
+  /**
+   * Logger.
+   */
+  private static Logger LOG = LoggerFactory.getLogger(AlertMaintenanceModeListener.class);
+
+  /**
+   * Used for updating the MM of current alerts.
+   */
+  @Inject
+  private AlertsDAO m_alertsDao = null;
+
+  /**
+   * Used to assist in determining implied maintenance state.
+   */
+  @Inject
+  private Provider<MaintenanceStateHelper> m_maintenanceHelper;
+
+  /**
+   * Used to lookup MM states.
+   */
+  @Inject
+  private Provider<Clusters> m_clusters;
+
+  /**
+   * Constructor.
+   *
+   * @param publisher
+   */
+  @Inject
+  public AlertMaintenanceModeListener(AmbariEventPublisher publisher) {
+    publisher.register(this);
+  }
+
+  /**
+   * Handles {@link MaintenanceModeEvent} by performing the following tasks:
+   * <ul>
+   * <li>Iterates through all {@link AlertNoticeEntity}, updating the MM state</li>
+   * </ul>
+   *
+   * @param event
+   *          the event being handled.
+   */
+  @Subscribe
+  @AllowConcurrentEvents
+  public void onEvent(MaintenanceModeEvent event) {
+    List<AlertCurrentEntity> currentAlerts = m_alertsDao.findCurrent();
+
+    for( AlertCurrentEntity currentAlert : currentAlerts ){
+      MaintenanceState currentState = currentAlert.getMaintenanceState();
+      AlertHistoryEntity history = currentAlert.getAlertHistory();
+      AlertDefinitionEntity definition = history.getAlertDefinition();
+
+      long clusterId = history.getClusterId();
+      String hostName = history.getHostName();
+      String serviceName = history.getServiceName();
+      String componentName = history.getComponentName();
+
+      try {
+        Cluster cluster = m_clusters.get().getClusterById(clusterId);
+        if (null == cluster) {
+          LOG.warn("Unable to find cluster with ID {}", clusterId);
+          continue;
+        }
+
+        Service service = cluster.getService(serviceName);
+        if (null == service) {
+          LOG.warn("Unable to find service named {} in cluster {}",
+              serviceName, cluster.getClusterName());
+
+          continue;
+        }
+
+        // if this is a service-level alert, then check explicitely against the
+        // service for the MM state
+        if (null == componentName) {
+          MaintenanceState serviceState = service.getMaintenanceState();
+          if (currentState != serviceState) {
+            currentAlert.setMaintenanceState(serviceState);
+            m_alertsDao.merge(currentAlert);
+          }
+        }
+        // the presence of a component name means that it's a component alert
+        // which require a host
+        else {
+          if (hostName == null) {
+            LOG.warn("The alert {} for component {} must have a host",
+                definition.getDefinitionName(), componentName);
+
+            continue;
+          }
+
+          List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostName);
+          if (null == serviceComponentHosts) {
+            LOG.warn(
+                "Unable to find service components on host {} for {} in cluster {}",
+                hostName, serviceName, cluster.getClusterName());
+
+            continue;
+          }
+
+          ServiceComponentHost serviceComponentHost = null;
+          for (ServiceComponentHost sch : serviceComponentHosts) {
+            if (componentName.equals(sch.getServiceComponentName())) {
+              serviceComponentHost = sch;
+              break;
+            }
+          }
+
+          if (null == serviceComponentHost) {
+            LOG.warn("Unable to find component {} of {} on host {}",
+                componentName, serviceName, hostName);
+
+            continue;
+          }
+
+          MaintenanceState effectiveState = m_maintenanceHelper.get().getEffectiveState(
+              serviceComponentHost);
+
+          switch (effectiveState) {
+            case OFF:
+              if (currentState != MaintenanceState.OFF) {
+                currentAlert.setMaintenanceState(MaintenanceState.OFF);
+                m_alertsDao.merge(currentAlert);
+              }
+
+              break;
+            case ON:
+            case IMPLIED_FROM_HOST:
+            case IMPLIED_FROM_SERVICE:
+            case IMPLIED_FROM_SERVICE_AND_HOST:
+              if (currentState == MaintenanceState.OFF) {
+                currentAlert.setMaintenanceState(MaintenanceState.ON);
+                m_alertsDao.merge(currentAlert);
+              }
+
+              break;
+            default:
+              break;
+          }
+        }
+      } catch (AmbariException ambariException) {
+        LOG.error("Unable to put alert '{}' for host {} into maintenance mode",
+            definition.getDefinitionName(), hostName, ambariException);
+      }
+    }
+  }
+}

+ 5 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.events.listeners;
 
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.AlertEvent;
 import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.AlertStateChangeEvent;
@@ -28,6 +29,7 @@ import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.MaintenanceState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +44,7 @@ import com.google.inject.Singleton;
  * {@link AlertStateChangeEvent} when an {@link AlertState} change is detected.
  */
 @Singleton
+@EagerSingleton
 public class AlertReceivedListener {
   /**
    * Logger.
@@ -112,6 +115,7 @@ public class AlertReceivedListener {
       AlertHistoryEntity history = createHistory(clusterId, definition, alert);
 
       current = new AlertCurrentEntity();
+      current.setMaintenanceState(MaintenanceState.OFF);
       current.setAlertHistory(history);
       current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
       current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
@@ -155,7 +159,7 @@ public class AlertReceivedListener {
 
       // broadcast the alert changed event for other subscribers
       AlertStateChangeEvent alertChangedEvent = new AlertStateChangeEvent(
-          event.getClusterId(), event.getAlert(), current.getAlertHistory(),
+          event.getClusterId(), event.getAlert(), current,
           oldState);
 
       m_alertEventPublisher.publish(alertChangedEvent);

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertServiceStateListener.java

@@ -21,6 +21,7 @@ import java.text.MessageFormat;
 import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.ControllerModule;
@@ -48,6 +49,7 @@ import com.google.inject.Singleton;
  * and {@link AlertGroupEntity} instances are correctly populated.
  */
 @Singleton
+@EagerSingleton
 public class AlertServiceStateListener {
   /**
    * Logger.

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertStateChangedListener.java

@@ -21,14 +21,17 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.AlertStateChangeEvent;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertGroupEntity;
 import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
 import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
 import org.apache.ambari.server.orm.entities.AlertTargetEntity;
+import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.NotificationState;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +47,7 @@ import com.google.inject.Singleton;
  * in the database.
  */
 @Singleton
+@EagerSingleton
 public class AlertStateChangedListener {
 
   /**
@@ -75,6 +79,13 @@ public class AlertStateChangedListener {
   public void onAlertEvent(AlertStateChangeEvent event) {
     LOG.debug(event);
 
+    // don't create any outbound alert notices if in MM
+    AlertCurrentEntity currentAlert = event.getCurrentAlert();
+    if (null != currentAlert
+        && currentAlert.getMaintenanceState() != MaintenanceState.OFF) {
+      return;
+    }
+
     AlertHistoryEntity history = event.getNewHistoricalEntry();
     AlertDefinitionEntity definition = history.getAlertDefinition();
 

+ 5 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java

@@ -27,6 +27,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.ServiceResponse;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
 import org.apache.ambari.server.events.ServiceInstalledEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
@@ -675,6 +676,10 @@ public class ServiceImpl implements Service {
         readWriteLock.writeLock().lock();
         serviceDesiredStateEntity.setMaintenanceState(state);
         saveIfPersisted();
+
+        // broadcast the maintenance mode change
+        MaintenanceModeEvent event = new MaintenanceModeEvent(state, this);
+        eventPublisher.publish(event);
       } finally {
         readWriteLock.writeLock().unlock();
       }

+ 73 - 60
ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java

@@ -31,6 +31,8 @@ import org.apache.ambari.server.agent.AgentEnv;
 import org.apache.ambari.server.agent.DiskInfo;
 import org.apache.ambari.server.agent.HostInfo;
 import org.apache.ambari.server.controller.HostResponse;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.cache.HostConfigMapping;
 import org.apache.ambari.server.orm.cache.HostConfigMappingImpl;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
@@ -89,7 +91,7 @@ public class HostImpl implements Host {
   private static final String TIMEZONE = "timezone";
   private static final String OS_RELEASE_VERSION = "os_release_version";
 
-  
+
   private final Gson gson;
 
   private static final Type hostAttributesType =
@@ -114,18 +116,22 @@ public class HostImpl implements Host {
   private List<DiskInfo> disksInfo = new ArrayList<DiskInfo>();
   private boolean persisted = false;
   private Integer currentPingPort = null;
-  
+
   private final StateMachine<HostState, HostEventType, HostEvent> stateMachine;
   private Map<Long, MaintenanceState> maintMap = null;
 
-  
-  
   // In-memory status, based on host components states
   private String status;
 
   // In-memory prefix of log file paths that is retrieved when the agent registers with the server
   private String prefix;
 
+  /**
+   * Used to publish events relating to host CRUD operations.
+   */
+  @Inject
+  private AmbariEventPublisher eventPublisher;
+
   private static final StateMachineFactory
     <HostImpl, HostState, HostEventType, HostEvent>
       stateMachineFactory
@@ -214,19 +220,19 @@ public class HostImpl implements Host {
   @Inject
   public HostImpl(@Assisted HostEntity hostEntity,
       @Assisted boolean persisted, Injector injector) {
-    this.stateMachine = stateMachineFactory.make(this);
+    stateMachine = stateMachineFactory.make(this);
     rwLock = new ReentrantReadWriteLock();
-    this.readLock = rwLock.readLock();
-    this.writeLock = rwLock.writeLock();
+    readLock = rwLock.readLock();
+    writeLock = rwLock.writeLock();
 
     this.hostEntity = hostEntity;
     this.persisted = persisted;
-    this.hostDAO = injector.getInstance(HostDAO.class);
-    this.hostStateDAO = injector.getInstance(HostStateDAO.class);
-    this.gson = injector.getInstance(Gson.class);
-    this.clusterDAO = injector.getInstance(ClusterDAO.class);
-    this.clusters = injector.getInstance(Clusters.class);
-    this.hostConfigMappingDAO = injector.getInstance(HostConfigMappingDAO.class);
+    hostDAO = injector.getInstance(HostDAO.class);
+    hostStateDAO = injector.getInstance(HostStateDAO.class);
+    gson = injector.getInstance(Gson.class);
+    clusterDAO = injector.getInstance(ClusterDAO.class);
+    clusters = injector.getInstance(Clusters.class);
+    hostConfigMappingDAO = injector.getInstance(HostConfigMappingDAO.class);
 
     hostStateEntity = hostEntity.getHostStateEntity();
     if (hostStateEntity == null) {
@@ -238,7 +244,7 @@ public class HostImpl implements Host {
         persist();
       }
     } else {
-      this.stateMachine.setCurrentState(hostStateEntity.getCurrentState());
+      stateMachine.setCurrentState(hostStateEntity.getCurrentState());
     }
 
   }
@@ -405,7 +411,7 @@ public class HostImpl implements Host {
       }
 
       // FIXME add all other information into host attributes
-      this.setAgentVersion(new AgentVersion(
+      setAgentVersion(new AgentVersion(
           hostInfo.getAgentUserId()));
 
       Map<String, String> attrs = new HashMap<String, String>();
@@ -462,7 +468,7 @@ public class HostImpl implements Host {
       if (hostInfo.getOSRelease() != null) {
         attrs.put(OS_RELEASE_VERSION, hostInfo.getOSRelease());
       }
-      
+
       setHostAttributes(attrs);
 
       saveIfPersisted();
@@ -482,7 +488,7 @@ public class HostImpl implements Host {
     }
 
   }
-  
+
   @Override
   public AgentEnv getLastAgentEnv() {
     readLock.lock();
@@ -533,7 +539,7 @@ public class HostImpl implements Host {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitionException e) {
         LOG.error("Can't handle Host event at current state"
-            + ", host=" + this.getHostName()
+            + ", host=" + getHostName()
             + ", currentState=" + oldState
             + ", eventType=" + event.getType()
             + ", event=" + event);
@@ -546,7 +552,7 @@ public class HostImpl implements Host {
     if (oldState != getState()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Host transitioned to a new state"
-            + ", host=" + this.getHostName()
+            + ", host=" + getHostName()
             + ", oldState=" + oldState
             + ", currentState=" + getState()
             + ", eventType=" + event.getType().name()
@@ -574,7 +580,7 @@ public class HostImpl implements Host {
       writeLock.unlock();
     }
   }
-  
+
   @Override
   public Integer getCurrentPingPort() {
     try {
@@ -608,7 +614,7 @@ public class HostImpl implements Host {
       writeLock.unlock();
     }
   }
-  
+
   @Override
   public String getPublicHostName() {
     try {
@@ -703,8 +709,8 @@ public class HostImpl implements Host {
       writeLock.unlock();
     }
   }
-  
-  
+
+
   @Override
   public long getTotalMemBytes() {
     try {
@@ -811,18 +817,18 @@ public class HostImpl implements Host {
       writeLock.unlock();
     }
   }
-  
+
   @Override
   public String getOsFamily() {
-	  String majorVersion = this.getHostAttributes().get(OS_RELEASE_VERSION).split("\\.")[0];
-	  return this.getHostAttributes().get(OSFAMILY) + majorVersion;
+	  String majorVersion = getHostAttributes().get(OS_RELEASE_VERSION).split("\\.")[0];
+	  return getHostAttributes().get(OSFAMILY) + majorVersion;
   }
 
   @Override
   public List<DiskInfo> getDisksInfo() {
     try {
       readLock.lock();
-      return this.disksInfo;
+      return disksInfo;
     } finally {
       readLock.unlock();
     }
@@ -854,10 +860,11 @@ public class HostImpl implements Host {
     try {
       writeLock.lock();
       hostStateEntity.setHealthStatus(gson.toJson(healthStatus));
-      
-      if (healthStatus.getHealthStatus().equals(HealthStatus.UNKNOWN))
+
+      if (healthStatus.getHealthStatus().equals(HealthStatus.UNKNOWN)) {
         setStatus(HealthStatus.UNKNOWN.name());
-      
+      }
+
       saveIfPersisted();
     } finally {
       writeLock.unlock();
@@ -943,7 +950,7 @@ public class HostImpl implements Host {
   public void setLastRegistrationTime(long lastRegistrationTime) {
     try {
       writeLock.lock();
-      this.hostEntity.setLastRegistrationTime(lastRegistrationTime);
+      hostEntity.setLastRegistrationTime(lastRegistrationTime);
       saveIfPersisted();
     } finally {
       writeLock.unlock();
@@ -1012,8 +1019,8 @@ public class HostImpl implements Host {
       writeLock.unlock();
     }
   }
-  
-  
+
+
   @Override
   public String getStatus() {
     return status;
@@ -1142,13 +1149,14 @@ public class HostImpl implements Host {
       hostStateDAO.merge(hostStateEntity);
     }
   }
-  
+
   @Override
   @Transactional
   public boolean addDesiredConfig(long clusterId, boolean selected, String user, Config config) {
-    if (null == user)
+    if (null == user) {
       throw new NullPointerException("User must be specified.");
-    
+    }
+
     HostConfigMapping exist = getDesiredConfigEntity(clusterId, config.getType());
     if (null != exist && exist.getVersion().equals(config.getTag())) {
       if (!selected) {
@@ -1157,9 +1165,9 @@ public class HostImpl implements Host {
       }
       return false;
     }
-    
+
     writeLock.lock();
-    
+
     try {
       // set all old mappings for this type to empty
       for (HostConfigMapping e : hostConfigMappingDAO.findByType(clusterId,
@@ -1167,7 +1175,7 @@ public class HostImpl implements Host {
         e.setSelected(0);
         hostConfigMappingDAO.merge(e);
       }
-      
+
       HostConfigMapping hostConfigMapping = new HostConfigMappingImpl();
       hostConfigMapping.setClusterId(Long.valueOf(clusterId));
       hostConfigMapping.setCreateTimestamp(Long.valueOf(System.currentTimeMillis()));
@@ -1176,31 +1184,31 @@ public class HostImpl implements Host {
       hostConfigMapping.setUser(user);
       hostConfigMapping.setType(config.getType());
       hostConfigMapping.setVersion(config.getTag());
-      
+
       hostConfigMappingDAO.create(hostConfigMapping);
     }
     finally {
       writeLock.unlock();
     }
-    
+
     hostDAO.merge(hostEntity);
-    
+
     return true;
   }
-  
+
   @Override
   public Map<String, DesiredConfig> getDesiredConfigs(long clusterId) {
     Map<String, DesiredConfig> map = new HashMap<String, DesiredConfig>();
 
     for (HostConfigMapping e : hostConfigMappingDAO.findSelected(
         clusterId, hostEntity.getHostName())) {
-      
+
       DesiredConfig dc = new DesiredConfig();
       dc.setTag(e.getVersion());
       dc.setServiceName(e.getServiceName());
       dc.setUser(e.getUser());
       map.put(e.getType(), dc);
-      
+
     }
     return map;
   }
@@ -1221,7 +1229,7 @@ public class HostImpl implements Host {
       hostConfigMap.put(desiredConfigEntry.getKey(), hostConfig);
     }
 
-    Map<Long, ConfigGroup> configGroups = cluster.getConfigGroupsByHostname(this.getHostName());
+    Map<Long, ConfigGroup> configGroups = cluster.getConfigGroupsByHostname(getHostName());
 
     if (configGroups != null && !configGroups.isEmpty()) {
       for (ConfigGroup configGroup : configGroups.values()) {
@@ -1249,11 +1257,11 @@ public class HostImpl implements Host {
   private HostConfigMapping getDesiredConfigEntity(long clusterId, String type) {
     HostConfigMapping findSelectedByType = hostConfigMappingDAO.findSelectedByType(clusterId,
         hostEntity.getHostName(), type);
-    
-    
+
+
     return findSelectedByType;
   }
-  
+
   private void ensureMaintMap() {
     if (null == maintMap) {
       String entity = hostStateEntity.getMaintenanceState();
@@ -1268,42 +1276,47 @@ public class HostImpl implements Host {
       }
     }
   }
-  
+
   @Override
   public void setMaintenanceState(long clusterId, MaintenanceState state) {
     try {
       writeLock.lock();
-      
+
       ensureMaintMap();
-      
+
       maintMap.put(Long.valueOf(clusterId), state);
       String json = gson.toJson(maintMap, maintMapType);
-      
+
       hostStateEntity.setMaintenanceState(json);
       saveIfPersisted();
+
+      // broadcast the maintenance mode change
+      MaintenanceModeEvent event = new MaintenanceModeEvent(state, this);
+      eventPublisher.publish(event);
     } finally {
       writeLock.unlock();
     }
   }
-  
+
   @Override
   public MaintenanceState getMaintenanceState(long clusterId) {
     try {
       readLock.lock();
 
       ensureMaintMap();
-      
+
       Long id = Long.valueOf(clusterId);
-      
-      if (!maintMap.containsKey(id))
+
+      if (!maintMap.containsKey(id)) {
         maintMap.put(id, MaintenanceState.OFF);
-        
+      }
+
       return maintMap.get(id);
     } finally {
       readLock.unlock();
     }
   }
-  
+
 }
 
-  
+

+ 13 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java

@@ -31,6 +31,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.AlertDefinitionCommand;
 import org.apache.ambari.server.controller.ServiceComponentHostResponse;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
 import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -110,6 +112,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
   @Inject
   private AlertDefinitionHash alertDefinitionHash;
 
+  /**
+   * Used to publish events relating to service CRUD operations.
+   */
+  @Inject
+  private AmbariEventPublisher eventPublisher;
+
   private HostComponentStateEntity stateEntity;
   private HostComponentDesiredStateEntity desiredStateEntity;
 
@@ -1431,6 +1439,11 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
       try {
         desiredStateEntity.setMaintenanceState(state);
         saveIfPersisted();
+
+        // broadcast the maintenance mode change
+        MaintenanceModeEvent event = new MaintenanceModeEvent(state, this);
+        eventPublisher.publish(event);
+
       } finally {
         writeLock.unlock();
       }

+ 188 - 0
ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java

@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentFactory;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.persist.PersistService;
+
+/**
+ * Tests that {@link EventsTest} instances are fired correctly and
+ * that alert data is bootstrapped into the database.
+ */
+public class EventsTest {
+
+  private static final String HOSTNAME = "c6401.ambari.apache.org";
+
+  private Clusters m_clusters;
+  private Cluster m_cluster;
+  private String m_clusterName;
+  private Injector m_injector;
+  private ServiceFactory m_serviceFactory;
+  private ServiceComponentFactory m_componentFactory;
+  private ServiceComponentHostFactory m_schFactory;
+  private AmbariEventPublisher m_eventPublisher;
+  private MockEventListener m_listener;
+
+  /**
+   *
+   */
+  @Before
+  public void setup() throws Exception {
+    m_injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    m_injector.getInstance(GuiceJpaInitializer.class);
+
+    m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
+    EventBus synchronizedBus = new EventBus();
+
+    // register mock listener
+    m_listener = m_injector.getInstance(MockEventListener.class);
+    synchronizedBus.register(m_listener);
+
+    // !!! need a synchronous op for testing
+    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
+    field.setAccessible(true);
+    field.set(m_eventPublisher, synchronizedBus);
+
+    m_clusters = m_injector.getInstance(Clusters.class);
+    m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
+    m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class);
+    m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class);
+
+    m_clusterName = "foo";
+    m_clusters.addCluster(m_clusterName);
+    m_clusters.addHost(HOSTNAME);
+
+    Host host = m_clusters.getHost(HOSTNAME);
+    Map<String, String> hostAttributes = new HashMap<String, String>();
+    hostAttributes.put("os_family", "redhat");
+    hostAttributes.put("os_release_version", "6.4");
+    host.setHostAttributes(hostAttributes);
+    host.setState(HostState.HEALTHY);
+    host.persist();
+
+    m_cluster = m_clusters.getCluster(m_clusterName);
+    m_cluster.setDesiredStackVersion(new StackId("HDP", "2.0.6"));
+    Assert.assertNotNull(m_cluster);
+
+    m_clusters.mapHostToCluster(HOSTNAME, m_clusterName);
+  }
+
+  /**
+   * @throws Exception
+   */
+  @After
+  public void teardown() throws Exception {
+    m_injector.getInstance(PersistService.class).stop();
+    m_injector = null;
+  }
+
+  /**
+   * Tests that {@link MaintenanceModeEvent}s are fired correctly.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMaintenanceModeEvents() throws Exception {
+    installHdfsService();
+    Service service = m_cluster.getService("HDFS");
+    Class<?> eventClass = MaintenanceModeEvent.class;
+
+    Assert.assertFalse(m_listener.isEventReceived(eventClass));
+    service.setMaintenanceState(MaintenanceState.ON);
+    Assert.assertTrue(m_listener.isEventReceived(eventClass));
+    Assert.assertEquals(1, m_listener.getEventReceivedCount(eventClass));
+
+    m_listener.reset();
+    Assert.assertFalse(m_listener.isEventReceived(eventClass));
+
+    List<ServiceComponentHost> componentHosts = m_cluster.getServiceComponentHosts(HOSTNAME);
+    ServiceComponentHost componentHost = componentHosts.get(0);
+    componentHost.setMaintenanceState(MaintenanceState.OFF);
+
+    Assert.assertTrue(m_listener.isEventReceived(eventClass));
+    Assert.assertEquals(1, m_listener.getEventReceivedCount(eventClass));
+
+    m_listener.reset();
+    Assert.assertFalse(m_listener.isEventReceived(eventClass));
+
+    Host host = m_clusters.getHost(HOSTNAME);
+    host.setMaintenanceState(m_cluster.getClusterId(), MaintenanceState.ON);
+    host.setMaintenanceState(m_cluster.getClusterId(), MaintenanceState.OFF);
+
+    Assert.assertTrue(m_listener.isEventReceived(eventClass));
+    Assert.assertEquals(2, m_listener.getEventReceivedCount(eventClass));
+  }
+
+  /**
+   * Calls {@link Service#persist()} to mock a service install along with
+   * creating a single {@link Host} and {@link ServiceComponentHost}.
+   */
+  private void installHdfsService() throws Exception {
+    String serviceName = "HDFS";
+    Service service = m_serviceFactory.createNew(m_cluster, serviceName);
+    m_cluster.addService(service);
+    service.persist();
+    service = m_cluster.getService(serviceName);
+    Assert.assertNotNull(service);
+
+    ServiceComponent component = m_componentFactory.createNew(service, "DATANODE");
+    service.addServiceComponent(component);
+    component.setDesiredState(State.INSTALLED);
+    component.persist();
+
+    ServiceComponentHost sch = m_schFactory.createNew(component, HOSTNAME);
+
+    component.addServiceComponentHost(sch);
+    sch.setDesiredState(State.INSTALLED);
+    sch.setState(State.INSTALLED);
+    sch.setDesiredStackVersion(new StackId("HDP-2.0.6"));
+    sch.setStackVersion(new StackId("HDP-2.0.6"));
+
+    sch.persist();
+  }
+}

+ 99 - 0
ambari-server/src/test/java/org/apache/ambari/server/events/MockEventListener.java

@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link MockEventListener} is used to provide a way to capture events
+ * being fired via an {@link EventBus}.
+ */
+@Singleton
+public class MockEventListener {
+
+  /**
+   * When an event is received, its class is captured and the event object is
+   * added to the list.
+   */
+  private final Map<Class<?>, List<Object>> m_receivedEvents = new HashMap<Class<?>, List<Object>>();
+
+  /**
+   * Resets the captured events.
+   */
+  public void reset() {
+    m_receivedEvents.clear();
+  }
+
+  /**
+   * Gets whether an event of the specified class was received.
+   *
+   * @param clazz
+   * @return
+   */
+  public boolean isEventReceived(Class<?> clazz) {
+    if (!m_receivedEvents.containsKey(clazz)) {
+      return false;
+    }
+
+    return m_receivedEvents.get(clazz).size() > 0;
+  }
+
+  /**
+   * Gets the total number of events received for the specified class.
+   *
+   * @param clazz
+   * @return
+   */
+  public int getEventReceivedCount(Class<?> clazz){
+    if (!m_receivedEvents.containsKey(clazz)) {
+      return 0;
+    }
+
+    return m_receivedEvents.get(clazz).size();
+  }
+
+  /**
+   * @param event
+   */
+  @Subscribe
+  public void onEvent(MaintenanceModeEvent event) {
+    handleEvent(event);
+  }
+
+  /**
+   * Inserts the event into the map of class to event invocations.
+   *
+   * @param event
+   */
+  private void handleEvent(Object event) {
+    List<Object> events = m_receivedEvents.get(event.getClass());
+    if (null == events) {
+      events = new ArrayList<Object>();
+      m_receivedEvents.put(event.getClass(), events);
+    }
+
+    events.add(event);
+  }
+}

+ 386 - 70
ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java

@@ -24,14 +24,21 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 import java.util.UUID;
 
+import junit.framework.Assert;
+
+import org.apache.ambari.server.events.listeners.AlertMaintenanceModeListener;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -39,13 +46,26 @@ import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
 import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentFactory;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.eventbus.EventBus;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
@@ -55,25 +75,46 @@ import com.google.inject.persist.PersistService;
  */
 public class AlertsDAOTest {
 
-  static Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+  final static String HOSTNAME = "c6401.ambari.apache.org";
+  final static Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+
+  private Clusters m_clusters;
+  private Long m_clusterId;
+  private Injector m_injector;
+  private OrmTestHelper m_helper;
+  private AlertsDAO m_dao;
+  private AlertDefinitionDAO m_definitionDao;
 
-  private Long clusterId;
-  private Injector injector;
-  private OrmTestHelper helper;
-  private AlertsDAO dao;
-  private AlertDefinitionDAO definitionDao;
+  private ServiceFactory m_serviceFactory;
+  private ServiceComponentFactory m_componentFactory;
+  private ServiceComponentHostFactory m_schFactory;
+  private AmbariEventPublisher m_eventPublisher;
 
   /**
    *
    */
   @Before
   public void setup() throws Exception {
-    injector = Guice.createInjector(new InMemoryDefaultTestModule());
-    injector.getInstance(GuiceJpaInitializer.class);
-    helper = injector.getInstance(OrmTestHelper.class);
-    clusterId = helper.createCluster();
-    dao = injector.getInstance(AlertsDAO.class);
-    definitionDao = injector.getInstance(AlertDefinitionDAO.class);
+    m_injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    m_injector.getInstance(GuiceJpaInitializer.class);
+    m_helper = m_injector.getInstance(OrmTestHelper.class);
+    m_clusterId = m_helper.createCluster();
+    m_dao = m_injector.getInstance(AlertsDAO.class);
+    m_definitionDao = m_injector.getInstance(AlertDefinitionDAO.class);
+    m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
+    m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class);
+    m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class);
+    m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
+    m_clusters = m_injector.getInstance(Clusters.class);
+
+    // register a listener
+    EventBus synchronizedBus = new EventBus();
+    synchronizedBus.register(m_injector.getInstance(AlertMaintenanceModeListener.class));
+
+    // !!! need a synchronous op for testing
+    Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
+    field.setAccessible(true);
+    field.set(m_eventPublisher, synchronizedBus);
 
     // create 5 definitions
     for (int i = 0; i < 5; i++) {
@@ -81,16 +122,16 @@ public class AlertsDAOTest {
       definition.setDefinitionName("Alert Definition " + i);
       definition.setServiceName("Service " + i);
       definition.setComponentName(null);
-      definition.setClusterId(clusterId);
+      definition.setClusterId(m_clusterId);
       definition.setHash(UUID.randomUUID().toString());
       definition.setScheduleInterval(Integer.valueOf(60));
       definition.setScope(Scope.SERVICE);
       definition.setSource("{\"type\" : \"SCRIPT\"}");
       definition.setSourceType(SourceType.SCRIPT);
-      definitionDao.create(definition);
+      m_definitionDao.create(definition);
     }
 
-    List<AlertDefinitionEntity> definitions = definitionDao.findAll();
+    List<AlertDefinitionEntity> definitions = m_definitionDao.findAll();
     assertNotNull(definitions);
     assertEquals(5, definitions.size());
 
@@ -102,7 +143,7 @@ public class AlertsDAOTest {
       for (int i = 0; i < 10; i++) {
         AlertHistoryEntity history = new AlertHistoryEntity();
         history.setServiceName(definition.getServiceName());
-        history.setClusterId(clusterId);
+        history.setClusterId(m_clusterId);
         history.setAlertDefinition(definition);
         history.setAlertLabel(definition.getDefinitionName() + " " + i);
         history.setAlertText(definition.getDefinitionName() + " " + i);
@@ -117,13 +158,13 @@ public class AlertsDAOTest {
         // increase the days for each
         calendar.add(Calendar.DATE, 1);
 
-        dao.create(history);
+        m_dao.create(history);
       }
     }
 
     // for each definition, create a current alert
     for (AlertDefinitionEntity definition : definitions) {
-      List<AlertHistoryEntity> alerts = dao.findAll();
+      List<AlertHistoryEntity> alerts = m_dao.findAll();
       AlertHistoryEntity history = null;
       for (AlertHistoryEntity alert : alerts) {
         if (definition.equals(alert.getAlertDefinition())) {
@@ -138,7 +179,7 @@ public class AlertsDAOTest {
       current.setLatestTimestamp(new Date().getTime());
       current.setOriginalTimestamp(new Date().getTime() - 10800000);
       current.setMaintenanceState(MaintenanceState.OFF);
-      dao.create(current);
+      m_dao.create(current);
     }
   }
 
@@ -147,8 +188,8 @@ public class AlertsDAOTest {
    */
   @After
   public void teardown() {
-    injector.getInstance(PersistService.class).stop();
-    injector = null;
+    m_injector.getInstance(PersistService.class).stop();
+    m_injector = null;
   }
 
 
@@ -157,7 +198,7 @@ public class AlertsDAOTest {
    */
   @Test
   public void testFindAll() {
-    List<AlertHistoryEntity> alerts = dao.findAll(clusterId);
+    List<AlertHistoryEntity> alerts = m_dao.findAll(m_clusterId);
     assertNotNull(alerts);
     assertEquals(50, alerts.size());
   }
@@ -167,7 +208,7 @@ public class AlertsDAOTest {
    */
   @Test
   public void testFindAllCurrent() {
-    List<AlertCurrentEntity> currentAlerts = dao.findCurrent();
+    List<AlertCurrentEntity> currentAlerts = m_dao.findCurrent();
     assertNotNull(currentAlerts);
     assertEquals(5, currentAlerts.size());
   }
@@ -177,19 +218,19 @@ public class AlertsDAOTest {
    */
   @Test
   public void testFindCurrentByService() {
-    List<AlertCurrentEntity> currentAlerts = dao.findCurrent();
+    List<AlertCurrentEntity> currentAlerts = m_dao.findCurrent();
     AlertCurrentEntity current = currentAlerts.get(0);
     AlertHistoryEntity history = current.getAlertHistory();
 
     assertNotNull(history);
 
-    currentAlerts = dao.findCurrentByService(clusterId,
+    currentAlerts = m_dao.findCurrentByService(m_clusterId,
         history.getServiceName());
 
     assertNotNull(currentAlerts);
     assertEquals(1, currentAlerts.size());
 
-    currentAlerts = dao.findCurrentByService(clusterId, "foo");
+    currentAlerts = m_dao.findCurrentByService(m_clusterId, "foo");
 
     assertNotNull(currentAlerts);
     assertEquals(0, currentAlerts.size());
@@ -205,18 +246,18 @@ public class AlertsDAOTest {
     hostDef.setDefinitionName("Host Alert Definition ");
     hostDef.setServiceName("HostService");
     hostDef.setComponentName(null);
-    hostDef.setClusterId(clusterId);
+    hostDef.setClusterId(m_clusterId);
     hostDef.setHash(UUID.randomUUID().toString());
     hostDef.setScheduleInterval(Integer.valueOf(60));
     hostDef.setScope(Scope.HOST);
     hostDef.setSource("{\"type\" : \"SCRIPT\"}");
     hostDef.setSourceType(SourceType.SCRIPT);
-    definitionDao.create(hostDef);
+    m_definitionDao.create(hostDef);
 
     // history for the definition
     AlertHistoryEntity history = new AlertHistoryEntity();
     history.setServiceName(hostDef.getServiceName());
-    history.setClusterId(clusterId);
+    history.setClusterId(m_clusterId);
     history.setAlertDefinition(hostDef);
     history.setAlertLabel(hostDef.getDefinitionName());
     history.setAlertText(hostDef.getDefinitionName());
@@ -229,14 +270,15 @@ public class AlertsDAOTest {
     current.setOriginalTimestamp(1L);
     current.setLatestTimestamp(2L);
     current.setAlertHistory(history);
-    dao.create(current);
+    m_dao.create(current);
 
-    List<AlertCurrentEntity> currentAlerts = dao.findCurrentByHost(clusterId, history.getHostName());
+    List<AlertCurrentEntity> currentAlerts = m_dao.findCurrentByHost(
+        m_clusterId, history.getHostName());
 
     assertNotNull(currentAlerts);
     assertEquals(1, currentAlerts.size());
 
-    currentAlerts = dao.findCurrentByHost(clusterId, "foo");
+    currentAlerts = m_dao.findCurrentByHost(m_clusterId, "foo");
 
     assertNotNull(currentAlerts);
     assertEquals(0, currentAlerts.size());
@@ -252,20 +294,21 @@ public class AlertsDAOTest {
     allStates.add(AlertState.WARNING);
     allStates.add(AlertState.CRITICAL);
 
-    List<AlertHistoryEntity> history = dao.findAll(clusterId, allStates);
+    List<AlertHistoryEntity> history = m_dao.findAll(m_clusterId, allStates);
     assertNotNull(history);
     assertEquals(50, history.size());
 
-    history = dao.findAll(clusterId, Collections.singletonList(AlertState.OK));
+    history = m_dao.findAll(m_clusterId,
+        Collections.singletonList(AlertState.OK));
     assertNotNull(history);
     assertEquals(40, history.size());
 
-    history = dao.findAll(clusterId,
+    history = m_dao.findAll(m_clusterId,
         Collections.singletonList(AlertState.CRITICAL));
     assertNotNull(history);
     assertEquals(10, history.size());
 
-    history = dao.findAll(clusterId,
+    history = m_dao.findAll(m_clusterId,
         Collections.singletonList(AlertState.WARNING));
     assertNotNull(history);
     assertEquals(0, history.size());
@@ -280,14 +323,14 @@ public class AlertsDAOTest {
     calendar.set(2014, Calendar.JANUARY, 1);
 
     // on or after 1/1/2014
-    List<AlertHistoryEntity> history = dao.findAll(clusterId,
+    List<AlertHistoryEntity> history = m_dao.findAll(m_clusterId,
         calendar.getTime(), null);
 
     assertNotNull(history);
     assertEquals(50, history.size());
 
     // on or before 1/1/2014
-    history = dao.findAll(clusterId, null, calendar.getTime());
+    history = m_dao.findAll(m_clusterId, null, calendar.getTime());
     assertNotNull(history);
     assertEquals(1, history.size());
 
@@ -298,27 +341,29 @@ public class AlertsDAOTest {
     calendar.set(2014, Calendar.JANUARY, 10);
     Date endDate = calendar.getTime();
 
-    history = dao.findAll(clusterId, startDate, endDate);
+    history = m_dao.findAll(m_clusterId, startDate, endDate);
     assertNotNull(history);
     assertEquals(6, history.size());
 
     // after 3/1
     calendar.set(2014, Calendar.MARCH, 5);
-    history = dao.findAll(clusterId, calendar.getTime(), null);
+    history = m_dao.findAll(m_clusterId, calendar.getTime(), null);
     assertNotNull(history);
     assertEquals(0, history.size());
 
-    history = dao.findAll(clusterId, endDate, startDate);
+    history = m_dao.findAll(m_clusterId, endDate, startDate);
     assertNotNull(history);
     assertEquals(0, history.size());
   }
 
   @Test
   public void testFindCurrentByHostAndName() throws Exception {
-    AlertCurrentEntity entity = dao.findCurrentByHostAndName(clusterId.longValue(), "h2", "Alert Definition 1");
+    AlertCurrentEntity entity = m_dao.findCurrentByHostAndName(
+        m_clusterId.longValue(), "h2", "Alert Definition 1");
     assertNull(entity);
 
-    entity = dao.findCurrentByHostAndName(clusterId.longValue(), "h1", "Alert Definition 1");
+    entity = m_dao.findCurrentByHostAndName(m_clusterId.longValue(), "h1",
+        "Alert Definition 1");
 
     assertNotNull(entity);
     assertNotNull(entity.getAlertHistory());
@@ -330,25 +375,29 @@ public class AlertsDAOTest {
    */
   @Test
   public void testFindCurrentSummary() throws Exception {
-    AlertSummaryDTO summary = dao.findCurrentCounts(clusterId.longValue(), null, null);
+    AlertSummaryDTO summary = m_dao.findCurrentCounts(m_clusterId.longValue(),
+        null, null);
     assertEquals(5, summary.getOkCount());
 
-    AlertHistoryEntity h1 = dao.findCurrentByCluster(clusterId.longValue()).get(2).getAlertHistory();
-    AlertHistoryEntity h2 = dao.findCurrentByCluster(clusterId.longValue()).get(3).getAlertHistory();
-    AlertHistoryEntity h3 = dao.findCurrentByCluster(clusterId.longValue()).get(4).getAlertHistory();
+    AlertHistoryEntity h1 = m_dao.findCurrentByCluster(m_clusterId.longValue()).get(
+        2).getAlertHistory();
+    AlertHistoryEntity h2 = m_dao.findCurrentByCluster(m_clusterId.longValue()).get(
+        3).getAlertHistory();
+    AlertHistoryEntity h3 = m_dao.findCurrentByCluster(m_clusterId.longValue()).get(
+        4).getAlertHistory();
     h1.setAlertState(AlertState.WARNING);
-    dao.merge(h1);
+    m_dao.merge(h1);
     h2.setAlertState(AlertState.CRITICAL);
-    dao.merge(h2);
+    m_dao.merge(h2);
     h3.setAlertState(AlertState.UNKNOWN);
-    dao.merge(h3);
+    m_dao.merge(h3);
 
     int ok = 0;
     int warn = 0;
     int crit = 0;
     int unk = 0;
 
-    for (AlertCurrentEntity h : dao.findCurrentByCluster(clusterId.longValue())) {
+    for (AlertCurrentEntity h : m_dao.findCurrentByCluster(m_clusterId.longValue())) {
       switch (h.getAlertHistory().getAlertState()) {
         case CRITICAL:
           crit++;
@@ -366,7 +415,7 @@ public class AlertsDAOTest {
 
     }
 
-    summary = dao.findCurrentCounts(clusterId.longValue(), null, null);
+    summary = m_dao.findCurrentCounts(m_clusterId.longValue(), null, null);
     // !!! db-to-db compare
     assertEquals(ok, summary.getOkCount());
     assertEquals(warn, summary.getWarningCount());
@@ -379,19 +428,20 @@ public class AlertsDAOTest {
     assertEquals(1, summary.getCriticalCount());
     assertEquals(1, summary.getCriticalCount());
 
-    summary = dao.findCurrentCounts(clusterId.longValue(), "Service 0", null);
+    summary = m_dao.findCurrentCounts(m_clusterId.longValue(), "Service 0",
+        null);
     assertEquals(1, summary.getOkCount());
     assertEquals(0, summary.getWarningCount());
     assertEquals(0, summary.getCriticalCount());
     assertEquals(0, summary.getCriticalCount());
 
-    summary = dao.findCurrentCounts(clusterId.longValue(), null, "h1");
+    summary = m_dao.findCurrentCounts(m_clusterId.longValue(), null, "h1");
     assertEquals(2, summary.getOkCount());
     assertEquals(1, summary.getWarningCount());
     assertEquals(1, summary.getCriticalCount());
     assertEquals(1, summary.getCriticalCount());
 
-    summary = dao.findCurrentCounts(clusterId.longValue(), "foo", null);
+    summary = m_dao.findCurrentCounts(m_clusterId.longValue(), "foo", null);
     assertEquals(0, summary.getOkCount());
     assertEquals(0, summary.getWarningCount());
     assertEquals(0, summary.getCriticalCount());
@@ -405,13 +455,13 @@ public class AlertsDAOTest {
     definition.setDefinitionName("many_per_cluster");
     definition.setServiceName("ServiceName");
     definition.setComponentName(null);
-    definition.setClusterId(clusterId);
+    definition.setClusterId(m_clusterId);
     definition.setHash(UUID.randomUUID().toString());
     definition.setScheduleInterval(Integer.valueOf(60));
     definition.setScope(Scope.SERVICE);
     definition.setSource("{\"type\" : \"SCRIPT\"}");
     definition.setSourceType(SourceType.SCRIPT);
-    definitionDao.create(definition);
+    m_definitionDao.create(definition);
 
     // history record #1 and current
     AlertHistoryEntity history = new AlertHistoryEntity();
@@ -421,7 +471,7 @@ public class AlertsDAOTest {
     history.setAlertState(AlertState.OK);
     history.setAlertText("");
     history.setAlertTimestamp(Long.valueOf(1L));
-    history.setClusterId(clusterId);
+    history.setClusterId(m_clusterId);
     history.setComponentName("");
     history.setHostName("h1");
     history.setServiceName("ServiceName");
@@ -430,7 +480,7 @@ public class AlertsDAOTest {
     current.setAlertHistory(history);
     current.setLatestTimestamp(Long.valueOf(1L));
     current.setOriginalTimestamp(Long.valueOf(1L));
-    dao.merge(current);
+    m_dao.merge(current);
 
     // history record #2 and current
     history = new AlertHistoryEntity();
@@ -440,7 +490,7 @@ public class AlertsDAOTest {
     history.setAlertState(AlertState.OK);
     history.setAlertText("");
     history.setAlertTimestamp(Long.valueOf(1L));
-    history.setClusterId(clusterId);
+    history.setClusterId(m_clusterId);
     history.setComponentName("");
     history.setHostName("h2");
     history.setServiceName("ServiceName");
@@ -449,27 +499,30 @@ public class AlertsDAOTest {
     current.setAlertHistory(history);
     current.setLatestTimestamp(Long.valueOf(1L));
     current.setOriginalTimestamp(Long.valueOf(1L));
-    dao.merge(current);
+    m_dao.merge(current);
 
-    AlertSummaryDTO summary = dao.findAggregateCounts(clusterId.longValue(), "many_per_cluster");
+    AlertSummaryDTO summary = m_dao.findAggregateCounts(
+        m_clusterId.longValue(), "many_per_cluster");
     assertEquals(2, summary.getOkCount());
     assertEquals(0, summary.getWarningCount());
     assertEquals(0, summary.getCriticalCount());
     assertEquals(0, summary.getUnknownCount());
 
-    AlertCurrentEntity c = dao.findCurrentByHostAndName(clusterId.longValue(),
+    AlertCurrentEntity c = m_dao.findCurrentByHostAndName(
+        m_clusterId.longValue(),
         "h2", "many_per_cluster");
     AlertHistoryEntity h = c.getAlertHistory();
     h.setAlertState(AlertState.CRITICAL);
-    dao.merge(h);
+    m_dao.merge(h);
 
-    summary = dao.findAggregateCounts(clusterId.longValue(), "many_per_cluster");
+    summary = m_dao.findAggregateCounts(m_clusterId.longValue(),
+        "many_per_cluster");
     assertEquals(2, summary.getOkCount());
     assertEquals(0, summary.getWarningCount());
     assertEquals(1, summary.getCriticalCount());
     assertEquals(0, summary.getUnknownCount());
 
-    summary = dao.findAggregateCounts(clusterId.longValue(), "foo");
+    summary = m_dao.findAggregateCounts(m_clusterId.longValue(), "foo");
     assertEquals(0, summary.getOkCount());
     assertEquals(0, summary.getWarningCount());
     assertEquals(0, summary.getCriticalCount());
@@ -484,7 +537,7 @@ public class AlertsDAOTest {
    */
   @Test
   public void testJPAInnerEntityStaleness() {
-    List<AlertCurrentEntity> currents = dao.findCurrent();
+    List<AlertCurrentEntity> currents = m_dao.findCurrent();
     AlertCurrentEntity current = currents.get(0);
     AlertHistoryEntity oldHistory = current.getAlertHistory();
 
@@ -506,14 +559,14 @@ public class AlertsDAOTest {
     newHistory.setHostName(oldHistory.getHostName());
     newHistory.setServiceName(oldHistory.getServiceName());
 
-    dao.create(newHistory);
+    m_dao.create(newHistory);
 
     assertTrue(newHistory.getAlertId().longValue() != oldHistory.getAlertId().longValue());
 
     current.setAlertHistory(newHistory);
-    dao.merge(current);
+    m_dao.merge(current);
 
-    AlertCurrentEntity newCurrent = dao.findCurrentByHostAndName(
+    AlertCurrentEntity newCurrent = m_dao.findCurrentByHostAndName(
         newHistory.getClusterId(),
         newHistory.getHostName(),
         newHistory.getAlertDefinition().getDefinitionName());
@@ -524,4 +577,267 @@ public class AlertsDAOTest {
     assertEquals(newHistory.getAlertState(),
         newCurrent.getAlertHistory().getAlertState());
   }
+
+  /**
+   * Tests that maintenance mode is set correctly on notices.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMaintenanceMode() throws Exception {
+    Cluster cluster = initializeNewCluster();
+
+    List<AlertCurrentEntity> currents = m_dao.findCurrent();
+    for (AlertCurrentEntity current : currents) {
+      m_dao.remove(current);
+    }
+
+    // create some definitions
+    AlertDefinitionEntity namenode = new AlertDefinitionEntity();
+    namenode.setDefinitionName("NAMENODE");
+    namenode.setServiceName("HDFS");
+    namenode.setComponentName("NAMENODE");
+    namenode.setClusterId(cluster.getClusterId());
+    namenode.setHash(UUID.randomUUID().toString());
+    namenode.setScheduleInterval(Integer.valueOf(60));
+    namenode.setScope(Scope.ANY);
+    namenode.setSource("{\"type\" : \"SCRIPT\"}");
+    namenode.setSourceType(SourceType.SCRIPT);
+    m_definitionDao.create(namenode);
+
+    AlertDefinitionEntity datanode = new AlertDefinitionEntity();
+    datanode.setDefinitionName("DATANODE");
+    datanode.setServiceName("HDFS");
+    datanode.setComponentName("DATANODE");
+    datanode.setClusterId(cluster.getClusterId());
+    datanode.setHash(UUID.randomUUID().toString());
+    datanode.setScheduleInterval(Integer.valueOf(60));
+    datanode.setScope(Scope.HOST);
+    datanode.setSource("{\"type\" : \"SCRIPT\"}");
+    datanode.setSourceType(SourceType.SCRIPT);
+    m_definitionDao.create(datanode);
+
+    AlertDefinitionEntity aggregate = new AlertDefinitionEntity();
+    aggregate.setDefinitionName("DATANODE_UP");
+    aggregate.setServiceName("HDFS");
+    aggregate.setComponentName(null);
+    aggregate.setClusterId(cluster.getClusterId());
+    aggregate.setHash(UUID.randomUUID().toString());
+    aggregate.setScheduleInterval(Integer.valueOf(60));
+    aggregate.setScope(Scope.SERVICE);
+    aggregate.setSource("{\"type\" : \"SCRIPT\"}");
+    aggregate.setSourceType(SourceType.SCRIPT);
+    m_definitionDao.create(aggregate);
+
+    // create some history
+    AlertHistoryEntity nnHistory = new AlertHistoryEntity();
+    nnHistory.setAlertState(AlertState.OK);
+    nnHistory.setServiceName(namenode.getServiceName());
+    nnHistory.setComponentName(namenode.getComponentName());
+    nnHistory.setClusterId(cluster.getClusterId());
+    nnHistory.setAlertDefinition(namenode);
+    nnHistory.setAlertLabel(namenode.getDefinitionName());
+    nnHistory.setAlertText(namenode.getDefinitionName());
+    nnHistory.setAlertTimestamp(calendar.getTimeInMillis());
+    nnHistory.setHostName(HOSTNAME);
+    m_dao.create(nnHistory);
+
+    AlertCurrentEntity nnCurrent = new AlertCurrentEntity();
+    nnCurrent.setAlertHistory(nnHistory);
+    nnCurrent.setLatestText(nnHistory.getAlertText());
+    nnCurrent.setMaintenanceState(MaintenanceState.OFF);
+    nnCurrent.setOriginalTimestamp(System.currentTimeMillis());
+    nnCurrent.setLatestTimestamp(System.currentTimeMillis());
+    m_dao.create(nnCurrent);
+
+    AlertHistoryEntity dnHistory = new AlertHistoryEntity();
+    dnHistory.setAlertState(AlertState.WARNING);
+    dnHistory.setServiceName(datanode.getServiceName());
+    dnHistory.setComponentName(datanode.getComponentName());
+    dnHistory.setClusterId(cluster.getClusterId());
+    dnHistory.setAlertDefinition(datanode);
+    dnHistory.setAlertLabel(datanode.getDefinitionName());
+    dnHistory.setAlertText(datanode.getDefinitionName());
+    dnHistory.setAlertTimestamp(calendar.getTimeInMillis());
+    dnHistory.setHostName(HOSTNAME);
+    m_dao.create(dnHistory);
+
+    AlertCurrentEntity dnCurrent = new AlertCurrentEntity();
+    dnCurrent.setAlertHistory(dnHistory);
+    dnCurrent.setLatestText(dnHistory.getAlertText());
+    dnCurrent.setMaintenanceState(MaintenanceState.OFF);
+    dnCurrent.setOriginalTimestamp(System.currentTimeMillis());
+    dnCurrent.setLatestTimestamp(System.currentTimeMillis());
+    m_dao.create(dnCurrent);
+
+    AlertHistoryEntity aggregateHistory = new AlertHistoryEntity();
+    aggregateHistory.setAlertState(AlertState.CRITICAL);
+    aggregateHistory.setServiceName(aggregate.getServiceName());
+    aggregateHistory.setComponentName(aggregate.getComponentName());
+    aggregateHistory.setClusterId(cluster.getClusterId());
+    aggregateHistory.setAlertDefinition(aggregate);
+    aggregateHistory.setAlertLabel(aggregate.getDefinitionName());
+    aggregateHistory.setAlertText(aggregate.getDefinitionName());
+    aggregateHistory.setAlertTimestamp(calendar.getTimeInMillis());
+    m_dao.create(aggregateHistory);
+
+    AlertCurrentEntity aggregateCurrent = new AlertCurrentEntity();
+    aggregateCurrent.setAlertHistory(aggregateHistory);
+    aggregateCurrent.setLatestText(aggregateHistory.getAlertText());
+    aggregateCurrent.setMaintenanceState(MaintenanceState.OFF);
+    aggregateCurrent.setOriginalTimestamp(System.currentTimeMillis());
+    aggregateCurrent.setLatestTimestamp(System.currentTimeMillis());
+    m_dao.create(aggregateCurrent);
+
+    currents = m_dao.findCurrent();
+    assertEquals(3, currents.size());
+
+    for (AlertCurrentEntity current : currents) {
+      assertEquals(MaintenanceState.OFF, current.getMaintenanceState());
+    }
+
+    // turn on HDFS MM
+    Service hdfs = m_clusters.getClusterById(cluster.getClusterId()).getService(
+        "HDFS");
+
+    hdfs.setMaintenanceState(MaintenanceState.ON);
+
+    currents = m_dao.findCurrent();
+    assertEquals(3, currents.size());
+    for (AlertCurrentEntity current : currents) {
+      assertEquals(MaintenanceState.ON, current.getMaintenanceState());
+    }
+
+    // turn HDFS MM off
+    hdfs.setMaintenanceState(MaintenanceState.OFF);
+
+    currents = m_dao.findCurrent();
+    assertEquals(3, currents.size());
+    for (AlertCurrentEntity current : currents) {
+      assertEquals(MaintenanceState.OFF, current.getMaintenanceState());
+    }
+
+    // turn on host MM
+    Host host = m_clusters.getHost(HOSTNAME);
+    host.setMaintenanceState(cluster.getClusterId(), MaintenanceState.ON);
+
+    // only NAMENODE and DATANODE should be in MM; the aggregate should not
+    // since the host is in MM
+    currents = m_dao.findCurrent();
+    assertEquals(3, currents.size());
+    for (AlertCurrentEntity current : currents) {
+      if (current.getAlertHistory().getComponentName() != null) {
+        assertEquals(MaintenanceState.ON, current.getMaintenanceState());
+      } else {
+        assertEquals(MaintenanceState.OFF, current.getMaintenanceState());
+      }
+    }
+
+    // turn host MM off
+    host.setMaintenanceState(cluster.getClusterId(), MaintenanceState.OFF);
+
+    currents = m_dao.findCurrent();
+    assertEquals(3, currents.size());
+    for (AlertCurrentEntity current : currents) {
+      assertEquals(MaintenanceState.OFF, current.getMaintenanceState());
+    }
+
+    // turn a component MM on
+    ServiceComponentHost nnComponent = null;
+    List<ServiceComponentHost> schs = cluster.getServiceComponentHosts(HOSTNAME);
+    for (ServiceComponentHost sch : schs) {
+      if ("NAMENODE".equals(sch.getServiceComponentName())) {
+        sch.setMaintenanceState(MaintenanceState.ON);
+        nnComponent = sch;
+      }
+    }
+
+    assertNotNull(nnComponent);
+
+    currents = m_dao.findCurrent();
+    assertEquals(3, currents.size());
+    for (AlertCurrentEntity current : currents) {
+      if ("NAMENODE".equals(current.getAlertHistory().getComponentName())) {
+        assertEquals(MaintenanceState.ON, current.getMaintenanceState());
+      } else {
+        assertEquals(MaintenanceState.OFF, current.getMaintenanceState());
+      }
+    }
+
+  }
+
+  private Cluster initializeNewCluster() throws Exception {
+    String clusterName = "cluster-" + System.currentTimeMillis();
+    m_clusters.addCluster(clusterName);
+
+    Cluster cluster = m_clusters.getCluster(clusterName);
+    cluster.setDesiredStackVersion(new StackId("HDP", "2.0.6"));
+
+    addHost();
+    m_clusters.mapHostToCluster(HOSTNAME, cluster.getClusterName());
+
+    installHdfsService(cluster);
+    return cluster;
+  }
+
+  /**
+   * @throws Exception
+   */
+  private void addHost() throws Exception {
+    m_clusters.addHost(HOSTNAME);
+
+    Host host = m_clusters.getHost(HOSTNAME);
+    Map<String, String> hostAttributes = new HashMap<String, String>();
+    hostAttributes.put("os_family", "redhat");
+    hostAttributes.put("os_release_version", "6.4");
+    host.setHostAttributes(hostAttributes);
+    host.setState(HostState.HEALTHY);
+    host.persist();
+  }
+
+  /**
+   * Calls {@link Service#persist()} to mock a service install along with
+   * creating a single {@link Host} and {@link ServiceComponentHost}.
+   */
+  private void installHdfsService(Cluster cluster) throws Exception {
+    String serviceName = "HDFS";
+    Service service = m_serviceFactory.createNew(cluster, serviceName);
+    cluster.addService(service);
+    service.persist();
+    service = cluster.getService(serviceName);
+    Assert.assertNotNull(service);
+
+    ServiceComponent datanode = m_componentFactory.createNew(service,
+        "DATANODE");
+
+    service.addServiceComponent(datanode);
+    datanode.setDesiredState(State.INSTALLED);
+    datanode.persist();
+
+    ServiceComponentHost sch = m_schFactory.createNew(datanode, HOSTNAME);
+
+    datanode.addServiceComponentHost(sch);
+    sch.setDesiredState(State.INSTALLED);
+    sch.setState(State.INSTALLED);
+    sch.setDesiredStackVersion(new StackId("HDP-2.0.6"));
+    sch.setStackVersion(new StackId("HDP-2.0.6"));
+
+    sch.persist();
+
+    ServiceComponent namenode = m_componentFactory.createNew(service,
+        "NAMENODE");
+
+    service.addServiceComponent(namenode);
+    namenode.setDesiredState(State.INSTALLED);
+    namenode.persist();
+
+    sch = m_schFactory.createNew(namenode, HOSTNAME);
+    namenode.addServiceComponentHost(sch);
+    sch.setDesiredState(State.INSTALLED);
+    sch.setState(State.INSTALLED);
+    sch.setDesiredStackVersion(new StackId("HDP-2.0.6"));
+    sch.setStackVersion(new StackId("HDP-2.0.6"));
+
+    sch.persist();
+  }
 }

+ 9 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java

@@ -50,6 +50,7 @@ import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
 import org.apache.ambari.server.orm.entities.AlertTargetEntity;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
 import org.apache.ambari.server.state.alert.AggregateSource;
 import org.apache.ambari.server.state.alert.AlertDefinition;
@@ -250,6 +251,13 @@ public class AlertDataManagerTest {
     List<AlertHistoryEntity> histories = dao.findAll(clusterId);
     assertEquals(1, histories.size());
 
+    AlertCurrentEntity currentAlert = new AlertCurrentEntity();
+    currentAlert.setAlertHistory(histories.get(0));
+    currentAlert.setMaintenanceState(MaintenanceState.OFF);
+    currentAlert.setOriginalTimestamp(System.currentTimeMillis());
+    currentAlert.setLatestTimestamp(System.currentTimeMillis());
+    dao.create(currentAlert);
+
     AlertTargetEntity target = helper.createAlertTarget();
     Set<AlertTargetEntity> targets = new HashSet<AlertTargetEntity>();
     targets.add(target);
@@ -262,7 +270,7 @@ public class AlertDataManagerTest {
         AlertState.OK);
 
     AlertStateChangeEvent event = new AlertStateChangeEvent(clusterId, alert1,
-        histories.get(0), AlertState.CRITICAL);
+        currentAlert, AlertState.CRITICAL);
 
     AlertStateChangedListener listener = injector.getInstance(AlertStateChangedListener.class);
     listener.onAlertEvent(event);