Selaa lähdekoodia

AMBARI-11528 AlertReceivedListener produces large number of db requests (dsen)

Dmytro Sen 10 vuotta sitten
vanhempi
commit
a48150ff2f

+ 14 - 3
ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java

@@ -379,6 +379,9 @@ public class Configuration {
    */
   private static final String ALERT_TEMPLATE_FILE = "alerts.template.file";
 
+  public static final String ALERTS_EXECUTION_SCHEDULER_THREADS_KEY = "alerts.execution.scheduler.maxThreads";
+  public static final String ALERTS_EXECUTION_SCHEDULER_THREADS_DEFAULT = "2";
+
   private static final Logger LOG = LoggerFactory.getLogger(
       Configuration.class);
 
@@ -1182,7 +1185,7 @@ public class Configuration {
 
   public int getOneWayAuthPort() {
     return Integer.parseInt(properties.getProperty(SRVR_ONE_WAY_SSL_PORT_KEY,
-                                                   String.valueOf(SRVR_ONE_WAY_SSL_PORT_DEFAULT)));
+        String.valueOf(SRVR_ONE_WAY_SSL_PORT_DEFAULT)));
   }
 
   public int getTwoWayAuthPort() {
@@ -1283,7 +1286,7 @@ public class Configuration {
 
   public Integer getRequestReadTimeout() {
     return Integer.parseInt(properties.getProperty(REQUEST_READ_TIMEOUT,
-                                                   REQUEST_READ_TIMEOUT_DEFAULT));
+        REQUEST_READ_TIMEOUT_DEFAULT));
   }
 
   public Integer getRequestConnectTimeout() {
@@ -1293,7 +1296,7 @@ public class Configuration {
 
   public String getExecutionSchedulerConnections() {
     return properties.getProperty(EXECUTION_SCHEDULER_CONNECTIONS,
-                                  DEFAULT_SCHEDULER_MAX_CONNECTIONS);
+        DEFAULT_SCHEDULER_MAX_CONNECTIONS);
   }
 
   public Long getExecutionSchedulerMisfireToleration() {
@@ -1433,6 +1436,14 @@ public class Configuration {
     return properties.getProperty(ALERT_TEMPLATE_FILE);
   }
 
+  /**
+   * @return max thread pool size for AlertEventPublisher, default 2
+   */
+  public int getAlertEventPublisherPoolSize() {
+    return Integer.parseInt(properties.getProperty(
+        ALERTS_EXECUTION_SCHEDULER_THREADS_KEY, ALERTS_EXECUTION_SCHEDULER_THREADS_DEFAULT));
+  }
+
   /**
    * Get the node recovery type DEFAULT|AUTO_START|FULL
    * @return

+ 26 - 59
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java

@@ -19,6 +19,7 @@ package org.apache.ambari.server.events.listeners.alerts;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.EagerSingleton;
@@ -28,6 +29,7 @@ import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.AlertStateChangeEvent;
 import org.apache.ambari.server.events.InitialAlertEvent;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.AlertsDAO;
 import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
@@ -100,12 +102,20 @@ public class AlertReceivedListener {
    */
   @Subscribe
   @AllowConcurrentEvents
+  @RequiresSession
   public void onAlertEvent(AlertReceivedEvent event) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(event.toString());
     }
 
     Alert alert = event.getAlert();
+
+    // jobs that were running when a service/component/host was changed
+    // which invalidate the alert should not be reported
+    if (!isValid(alert)) {
+      return;
+    }
+
     long clusterId = event.getClusterId();
 
     AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
@@ -129,13 +139,7 @@ public class AlertReceivedListener {
       return;
     }
 
-    // jobs that were running when a service/component/host was changed
-    // which invalidate the alert should not be reported
-    if (!isValid(alert)) {
-      return;
-    }
-
-    AlertCurrentEntity current = null;
+    AlertCurrentEntity current;
 
     if (StringUtils.isBlank(alert.getHostName()) || definition.isHostIgnored()) {
       current = m_alertsDao.findCurrentByNameNoHost(clusterId, alert.getName());
@@ -151,7 +155,7 @@ public class AlertReceivedListener {
       current.setMaintenanceState(MaintenanceState.OFF);
       current.setAlertHistory(history);
       current.setLatestTimestamp(alert.getTimestamp());
-      current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setOriginalTimestamp(alert.getTimestamp());
       m_alertsDao.create(current);
 
       // broadcast the initial alert being received
@@ -162,7 +166,7 @@ public class AlertReceivedListener {
     } else if (alert.getState() == current.getAlertHistory().getAlertState()) {
       current.setLatestTimestamp(alert.getTimestamp());
       current.setLatestText(alert.getText());
-      current = m_alertsDao.merge(current);
+      m_alertsDao.merge(current);
     } else {
       if (LOG.isDebugEnabled()) {
         LOG.debug(
@@ -179,16 +183,11 @@ public class AlertReceivedListener {
       AlertHistoryEntity history = createHistory(clusterId,
           oldHistory.getAlertDefinition(), alert);
 
-      // manually create the new history entity since we are merging into
-      // an existing current entity
-      m_alertsDao.create(history);
-
-      current.setAlertHistory(history);
-      current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
-      current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
+      current.setLatestTimestamp(alert.getTimestamp());
+      current.setOriginalTimestamp(alert.getTimestamp());
       current.setLatestText(alert.getText());
 
-      current = m_alertsDao.merge(current);
+      current = m_alertsDao.mergeAlertCurrentWithAlertHistory(current, history);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(
@@ -252,58 +251,26 @@ public class AlertReceivedListener {
       return false;
     }
 
-    Map<String, Service> services = cluster.getServices();
-    Service service = services.get(serviceName);
-    if (null == service) {
-      LOG.error("Unable to process alert {} for an invalid service named {}",
-          alert.getName(), serviceName);
-
-      return false;
-    }
-
     if (StringUtils.isNotBlank(hostName)) {
-      List<Host> hosts = m_clusters.get().getHosts();
-      if (null == hosts) {
+      // if valid hostname
+      if (!m_clusters.get().hostExists(hostName)) {
         LOG.error("Unable to process alert {} for an invalid host named {}",
             alert.getName(), hostName);
-
         return false;
       }
-
-      boolean validHost = false;
-      for (Host host : hosts) {
-        if (hostName.equals(host.getHostName())) {
-          validHost = true;
-          break;
-        }
-      }
-
-      if (!validHost) {
-        LOG.error("Unable to process alert {} for an invalid host named {}",
-            alert.getName(), hostName);
+      if (!cluster.getServices().containsKey(serviceName)) {
+        LOG.error("Unable to process alert {} for an invalid service named {}",
+            alert.getName(), serviceName);
 
         return false;
       }
-    }
-
-    // if the alert is for a host/component then verify that the component
-    // is actually installed on that host
-    if (StringUtils.isNotBlank(hostName) && null != componentName) {
-      boolean validServiceComponentHost = false;
-      List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostName);
-
-      for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
-        if (componentName.equals(serviceComponentHost.getServiceComponentName())) {
-          validServiceComponentHost = true;
-          break;
-        }
-      }
-
-      if (!validServiceComponentHost) {
-        LOG.warn(
+      // if the alert is for a host/component then verify that the component
+      // is actually installed on that host
+      if (null != componentName &&
+          !cluster.getHosts(serviceName, componentName).contains(hostName)) {
+        LOG.error(
             "Unable to process alert {} for an invalid service {} and component {} on host {}",
             alert.getName(), serviceName, componentName, hostName);
-
         return false;
       }
     }

+ 4 - 3
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertStateChangedListener.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.events.listeners.alerts;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -119,7 +120,7 @@ public class AlertStateChangedListener {
     }
 
     List<AlertGroupEntity> groups = m_alertsDispatchDao.findGroupsByDefinition(definition);
-
+    List<AlertNoticeEntity> notices = new LinkedList<AlertNoticeEntity>();
     // for each group, determine if there are any targets that need to receive
     // a notification about the alert state change event
     for (AlertGroupEntity group : groups) {
@@ -138,10 +139,10 @@ public class AlertStateChangedListener {
         notice.setAlertTarget(target);
         notice.setAlertHistory(event.getNewHistoricalEntry());
         notice.setNotifyState(NotificationState.PENDING);
-
-        m_alertsDispatchDao.create(notice);
+        notices.add(notice);
       }
     }
+    m_alertsDispatchDao.createNotices(notices);
   }
 
   /**

+ 6 - 2
ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java

@@ -23,6 +23,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.inject.Inject;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.events.AlertEvent;
 
 import com.google.common.eventbus.AsyncEventBus;
@@ -49,10 +51,12 @@ public final class AlertEventPublisher {
   /**
    * Constructor.
    */
-  public AlertEventPublisher() {
+  @Inject
+  public AlertEventPublisher(Configuration config) {
     // create a fixed executor that is unbounded for now and will run rejected
     // requests in the calling thread to prevent loss of alert handling
-    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L,
+    int poolsize = config.getAlertEventPublisherPoolSize();
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, poolsize, 0L,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
         new AlertEventBusThreadFactory(),
         new ThreadPoolExecutor.CallerRunsPolicy());

+ 17 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDispatchDAO.java

@@ -557,6 +557,23 @@ public class AlertDispatchDAO {
     }
   }
 
+  /**
+   * Persists new alert notices.
+   *
+   * @param entities
+   *          the targets to persist (not {@code null}).
+   */
+  @Transactional
+  public void createNotices(List<AlertNoticeEntity> entities) {
+    if (null == entities) {
+      return;
+    }
+
+    for (AlertNoticeEntity entity : entities) {
+      create(entity);
+    }
+  }
+
   /**
    * Persists a new alert target.
    *

+ 21 - 19
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java

@@ -716,6 +716,27 @@ public class AlertsDAO {
     return entityManagerProvider.get().merge(alert);
   }
 
+  /**
+   * Merge the specified current alert with the history and
+   * the existing alert in the database in a single transaction.
+   *
+   * @param alert
+   *          the current alert to merge (not {@code null}).
+   * @param history
+   *          the history to set to alert (not {@code null}).
+   * @return the updated current alert with merged content (never @code null}).
+   */
+  @Transactional
+  public AlertCurrentEntity mergeAlertCurrentWithAlertHistory(
+      AlertCurrentEntity alert, AlertHistoryEntity history) {
+
+    // manually create the new history entity since we are merging into
+    // an existing current entity
+    create(history);
+    alert.setAlertHistory(history);
+    return merge(alert);
+  }
+
   /**
    * Removes the specified current alert from the database.
    *
@@ -773,25 +794,6 @@ public class AlertsDAO {
     return daoUtils.selectOne(query);
   }
 
-  /**
-   * Sets {@link QueryHints#REFRESH} on the specified query so that child
-   * entities are not stale.
-   * <p/>
-   * See <a
-   * href="https://bugs.eclipse.org/bugs/show_bug.cgi?id=398067">https://bugs
-   * .eclipse.org/bugs/show_bug.cgi?id=398067</a>
-   *
-   * @param query
-   * @return
-   */
-  private <T> TypedQuery<T> setQueryRefreshHint(TypedQuery<T> query) {
-    // !!! https://bugs.eclipse.org/bugs/show_bug.cgi?id=398067
-    // ensure that an associated entity with a JOIN is not stale; this causes
-    // the associated AlertHistoryEntity to be stale
-    query.setHint(QueryHints.REFRESH, HintValues.TRUE);
-    return query;
-  }
-
   /**
    * The {@link HistoryPredicateVisitor} is used to convert an Ambari
    * {@link Predicate} into a JPA {@link javax.persistence.criteria.Predicate}.

+ 7 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Clusters.java

@@ -79,6 +79,13 @@ public interface Clusters {
    */
   public Host getHost(String hostname) throws AmbariException;
 
+  /**
+   * Check if host exists
+   * @param hostname Name of the host requested
+   * @return is host exists
+   */
+  public boolean hostExists(String hostname);
+
   /**
    * Get a Host object managed by this server
    * @param hostId Host Id from the {@link org.apache.ambari.server.orm.entities.HostEntity} objecty

+ 7 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java

@@ -342,6 +342,13 @@ public class ClustersImpl implements Clusters {
     return hosts.get(hostname);
   }
 
+  @Override
+  public boolean hostExists(String hostname){
+    checkLoaded();
+
+    return hosts.containsKey(hostname);
+  }
+
   @Override
   public Host getHostById(Long hostId) throws AmbariException {
     checkLoaded();

+ 2 - 0
ambari-server/src/test/java/org/apache/ambari/server/alerts/AgentHeartbeatAlertRunnableTest.java

@@ -44,6 +44,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.stack.OsFamily;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
@@ -219,6 +220,7 @@ public class AgentHeartbeatAlertRunnableTest {
       Cluster cluster = EasyMock.createNiceMock(Cluster.class);
 
       binder.bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
+      binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
       binder.bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
       binder.bind(Cluster.class).toInstance(cluster);
       binder.bind(AlertDefinitionDAO.class).toInstance(createNiceMock(AlertDefinitionDAO.class));

+ 2 - 0
ambari-server/src/test/java/org/apache/ambari/server/alerts/StaleAlertRunnableTest.java

@@ -47,6 +47,7 @@ import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.stack.OsFamily;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
@@ -332,6 +333,7 @@ public class StaleAlertRunnableTest {
       Cluster cluster = EasyMock.createNiceMock(Cluster.class);
 
       binder.bind(Clusters.class).toInstance(createNiceMock(Clusters.class));
+      binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
       binder.bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class));
       binder.bind(Cluster.class).toInstance(cluster);
       binder.bind(AlertDefinitionDAO.class).toInstance(createNiceMock(AlertDefinitionDAO.class));

+ 1 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java

@@ -533,6 +533,7 @@ public class AlertResourceProviderTest {
       expect(configuration.getDatabaseDriver()).andReturn(JDBC_IN_MEMROY_DRIVER).anyTimes();
       expect(configuration.getDatabaseUser()).andReturn("test").anyTimes();
       expect(configuration.getDatabasePassword()).andReturn("test").anyTimes();
+      expect(configuration.getAlertEventPublisherPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_DEFAULT)).anyTimes();
       replay(configuration);
     }
   }

+ 3 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java

@@ -110,7 +110,7 @@ public class AlertStateChangedEventTest {
         dispatchDao.findGroupsByDefinition(EasyMock.anyObject(AlertDefinitionEntity.class))).andReturn(
         groups).once();
 
-    dispatchDao.create(EasyMock.anyObject(AlertNoticeEntity.class));
+    dispatchDao.createNotices(EasyMock.<List<AlertNoticeEntity>>anyObject());
     EasyMock.expectLastCall().once();
 
     EasyMock.replay(alertTarget, alertGroup, dispatchDao);
@@ -163,6 +163,8 @@ public class AlertStateChangedEventTest {
         dispatchDao.findGroupsByDefinition(EasyMock.anyObject(AlertDefinitionEntity.class))).andReturn(
         groups).once();
 
+    dispatchDao.createNotices(EasyMock.<List<AlertNoticeEntity>>anyObject());
+
     // dispatchDao should be strict enough to throw an exception on verify
     // that the create alert notice method was not called
     EasyMock.replay(alertTarget, alertGroup, dispatchDao);