|
@@ -18,9 +18,6 @@
|
|
|
|
|
|
package org.apache.ambari.server.orm.dao;
|
|
|
|
|
|
-import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
|
|
|
-import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT;
|
|
|
-
|
|
|
import java.text.MessageFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
@@ -28,6 +25,7 @@ import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import javax.persistence.EntityManager;
|
|
|
import javax.persistence.TypedQuery;
|
|
@@ -49,16 +47,27 @@ import org.apache.ambari.server.orm.entities.HostEntity;
|
|
|
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
|
|
|
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity_;
|
|
|
import org.apache.ambari.server.orm.entities.StageEntity;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
+import com.google.common.cache.CacheLoader;
|
|
|
+import com.google.common.cache.LoadingCache;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.google.inject.Inject;
|
|
|
import com.google.inject.Provider;
|
|
|
import com.google.inject.Singleton;
|
|
|
+import com.google.inject.name.Named;
|
|
|
import com.google.inject.persist.Transactional;
|
|
|
|
|
|
+import static org.apache.ambari.server.orm.DBAccessor.DbType.ORACLE;
|
|
|
+import static org.apache.ambari.server.orm.dao.DaoUtils.ORACLE_LIST_LIMIT;
|
|
|
+
|
|
|
@Singleton
|
|
|
public class HostRoleCommandDAO {
|
|
|
|
|
|
+ private static final Logger LOG = LoggerFactory.getLogger(HostRoleCommandDAO.class);
|
|
|
+
|
|
|
private static final String SUMMARY_DTO = String.format(
|
|
|
"SELECT NEW %s(" +
|
|
|
"MAX(hrc.stage.skippable), " +
|
|
@@ -92,12 +101,122 @@ public class HostRoleCommandDAO {
|
|
|
*/
|
|
|
private static final String COMPLETED_REQUESTS_SQL = "SELECT DISTINCT task.requestId FROM HostRoleCommandEntity task WHERE task.requestId NOT IN (SELECT task.requestId FROM HostRoleCommandEntity task WHERE task.status IN :notCompletedStatuses) ORDER BY task.requestId {0}";
|
|
|
|
|
|
+ /**
|
|
|
+ * A cache that holds {@link HostRoleCommandStatusSummaryDTO} grouped by stage id for requests by request id.
|
|
|
+ * The JPQL computing the host role command status summary for a request is rather expensive
|
|
|
+ * thus this cache helps reducing the load on the database
|
|
|
+ */
|
|
|
+ private final LoadingCache<Long, Map<Long, HostRoleCommandStatusSummaryDTO>> hrcStatusSummaryCache;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Specifies whether caching for {@link HostRoleCommandStatusSummaryDTO} grouped by stage id for requests
|
|
|
+ * is enabled.
|
|
|
+ */
|
|
|
+ private final boolean hostRoleCommandStatusSummaryCacheEnabled;
|
|
|
+
|
|
|
+
|
|
|
@Inject
|
|
|
Provider<EntityManager> entityManagerProvider;
|
|
|
|
|
|
@Inject
|
|
|
DaoUtils daoUtils;
|
|
|
|
|
|
+ public final static String HRC_STATUS_SUMMARY_CACHE_SIZE = "hostRoleCommandStatusSummaryCacheSize";
|
|
|
+ public final static String HRC_STATUS_SUMMARY_CACHE_EXPIRY_DURATION_MINUTES = "hostRoleCommandStatusCacheExpiryDurationMins";
|
|
|
+ public final static String HRC_STATUS_SUMMARY_CACHE_ENABLED = "hostRoleCommandStatusSummaryCacheEnabled";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Invalidates the host role command status summary cache entry that corresponds to the given request.
|
|
|
+ * @param requestId the key of the cache entry to be invalidated.
|
|
|
+ */
|
|
|
+ protected void invalidateHostRoleCommandStatusSummaryCache(Long requestId) {
|
|
|
+ if (!hostRoleCommandStatusSummaryCacheEnabled )
|
|
|
+ return;
|
|
|
+
|
|
|
+ LOG.debug("Invalidating host role command status summary cache for request {} !", requestId);
|
|
|
+ hrcStatusSummaryCache.invalidate(requestId);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Invalidates those entries in host role command status cache which are dependent on the passed {@link org.apache.ambari.server.orm.entities.HostRoleCommandEntity}
|
|
|
+ * entity.
|
|
|
+ * @param hostRoleCommandEntity
|
|
|
+ */
|
|
|
+ protected void invalidateHostRoleCommandStatusCache(HostRoleCommandEntity hostRoleCommandEntity) {
|
|
|
+ if ( !hostRoleCommandStatusSummaryCacheEnabled )
|
|
|
+ return;
|
|
|
+
|
|
|
+ if (hostRoleCommandEntity != null) {
|
|
|
+ Long requestId = hostRoleCommandEntity.getRequestId();
|
|
|
+ if (requestId == null) {
|
|
|
+ StageEntity stageEntity = hostRoleCommandEntity.getStage();
|
|
|
+ if (stageEntity != null)
|
|
|
+ requestId = stageEntity.getRequestId();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (requestId != null)
|
|
|
+ invalidateHostRoleCommandStatusSummaryCache(requestId.longValue());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Loads the counts of tasks for a request and groups them by stage id.
|
|
|
+ * This allows for very efficient loading when there are a huge number of stages
|
|
|
+ * and tasks to iterate (for example, during a Stack Upgrade).
|
|
|
+ * @param requestId the request id
|
|
|
+ * @return the map of stage-to-summary objects
|
|
|
+ */
|
|
|
+ @RequiresSession
|
|
|
+ protected Map<Long, HostRoleCommandStatusSummaryDTO> loadAggregateCounts(Long requestId) {
|
|
|
+
|
|
|
+ TypedQuery<HostRoleCommandStatusSummaryDTO> query = entityManagerProvider.get().createQuery(
|
|
|
+ SUMMARY_DTO, HostRoleCommandStatusSummaryDTO.class);
|
|
|
+
|
|
|
+ query.setParameter("requestId", requestId);
|
|
|
+ query.setParameter("aborted", HostRoleStatus.ABORTED);
|
|
|
+ query.setParameter("completed", HostRoleStatus.COMPLETED);
|
|
|
+ query.setParameter("failed", HostRoleStatus.FAILED);
|
|
|
+ query.setParameter("holding", HostRoleStatus.HOLDING);
|
|
|
+ query.setParameter("holding_failed", HostRoleStatus.HOLDING_FAILED);
|
|
|
+ query.setParameter("holding_timedout", HostRoleStatus.HOLDING_TIMEDOUT);
|
|
|
+ query.setParameter("in_progress", HostRoleStatus.IN_PROGRESS);
|
|
|
+ query.setParameter("pending", HostRoleStatus.PENDING);
|
|
|
+ query.setParameter("queued", HostRoleStatus.QUEUED);
|
|
|
+ query.setParameter("timedout", HostRoleStatus.TIMEDOUT);
|
|
|
+ query.setParameter("skipped_failed", HostRoleStatus.SKIPPED_FAILED);
|
|
|
+
|
|
|
+ Map<Long, HostRoleCommandStatusSummaryDTO> map = new HashMap<Long, HostRoleCommandStatusSummaryDTO>();
|
|
|
+
|
|
|
+ for (HostRoleCommandStatusSummaryDTO dto : daoUtils.selectList(query)) {
|
|
|
+ map.put(dto.getStageId(), dto);
|
|
|
+ }
|
|
|
+
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Inject
|
|
|
+ public HostRoleCommandDAO(@Named(HRC_STATUS_SUMMARY_CACHE_ENABLED) boolean hostRoleCommandStatusSummaryCacheEnabled, @Named(HRC_STATUS_SUMMARY_CACHE_SIZE) long hostRoleCommandStatusSummaryCacheLimit, @Named(HRC_STATUS_SUMMARY_CACHE_EXPIRY_DURATION_MINUTES) long hostRoleCommandStatusSummaryCacheExpiryDurationMins) {
|
|
|
+ this.hostRoleCommandStatusSummaryCacheEnabled = hostRoleCommandStatusSummaryCacheEnabled;
|
|
|
+
|
|
|
+ LOG.info("Host role command status summary cache {} !", hostRoleCommandStatusSummaryCacheEnabled ? "enabled" : "disabled");
|
|
|
+
|
|
|
+
|
|
|
+ hrcStatusSummaryCache = CacheBuilder.newBuilder()
|
|
|
+ .maximumSize(hostRoleCommandStatusSummaryCacheLimit)
|
|
|
+ .expireAfterAccess(hostRoleCommandStatusSummaryCacheExpiryDurationMins, TimeUnit.MINUTES)
|
|
|
+ .build(new CacheLoader<Long, Map<Long, HostRoleCommandStatusSummaryDTO>>() {
|
|
|
+ @Override
|
|
|
+ public Map<Long, HostRoleCommandStatusSummaryDTO> load(Long requestId) throws Exception {
|
|
|
+ LOG.debug("Cache miss for host role command status summary object for request {}, fetching from JPA", requestId);
|
|
|
+ Map<Long, HostRoleCommandStatusSummaryDTO> hrcCommandStatusByStageId = loadAggregateCounts(requestId);
|
|
|
+
|
|
|
+ return hrcCommandStatusByStageId;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
@RequiresSession
|
|
|
public HostRoleCommandEntity findByPK(long taskId) {
|
|
|
return entityManagerProvider.get().find(HostRoleCommandEntity.class, taskId);
|
|
@@ -425,11 +544,16 @@ public class HostRoleCommandDAO {
|
|
|
@Transactional
|
|
|
public void create(HostRoleCommandEntity stageEntity) {
|
|
|
entityManagerProvider.get().persist(stageEntity);
|
|
|
+
|
|
|
+ invalidateHostRoleCommandStatusCache(stageEntity);
|
|
|
}
|
|
|
|
|
|
@Transactional
|
|
|
public HostRoleCommandEntity merge(HostRoleCommandEntity stageEntity) {
|
|
|
HostRoleCommandEntity entity = entityManagerProvider.get().merge(stageEntity);
|
|
|
+
|
|
|
+ invalidateHostRoleCommandStatusCache(entity);
|
|
|
+
|
|
|
return entity;
|
|
|
}
|
|
|
|
|
@@ -446,6 +570,8 @@ public class HostRoleCommandDAO {
|
|
|
List<HostRoleCommandEntity> managedList = new ArrayList<HostRoleCommandEntity>(entities.size());
|
|
|
for (HostRoleCommandEntity entity : entities) {
|
|
|
managedList.add(entityManagerProvider.get().merge(entity));
|
|
|
+
|
|
|
+ invalidateHostRoleCommandStatusCache(entity);
|
|
|
}
|
|
|
return managedList;
|
|
|
}
|
|
@@ -453,6 +579,8 @@ public class HostRoleCommandDAO {
|
|
|
@Transactional
|
|
|
public void remove(HostRoleCommandEntity stageEntity) {
|
|
|
entityManagerProvider.get().remove(merge(stageEntity));
|
|
|
+
|
|
|
+ invalidateHostRoleCommandStatusCache(stageEntity);
|
|
|
}
|
|
|
|
|
|
@Transactional
|
|
@@ -463,39 +591,17 @@ public class HostRoleCommandDAO {
|
|
|
|
|
|
/**
|
|
|
* Finds the counts of tasks for a request and groups them by stage id.
|
|
|
- * This allows for very efficient loading when there are a huge number of stages
|
|
|
- * and tasks to iterate (for example, during a Stack Upgrade).
|
|
|
* @param requestId the request id
|
|
|
* @return the map of stage-to-summary objects
|
|
|
*/
|
|
|
- @RequiresSession
|
|
|
public Map<Long, HostRoleCommandStatusSummaryDTO> findAggregateCounts(Long requestId) {
|
|
|
-
|
|
|
- TypedQuery<HostRoleCommandStatusSummaryDTO> query = entityManagerProvider.get().createQuery(
|
|
|
- SUMMARY_DTO, HostRoleCommandStatusSummaryDTO.class);
|
|
|
-
|
|
|
- query.setParameter("requestId", requestId);
|
|
|
- query.setParameter("aborted", HostRoleStatus.ABORTED);
|
|
|
- query.setParameter("completed", HostRoleStatus.COMPLETED);
|
|
|
- query.setParameter("failed", HostRoleStatus.FAILED);
|
|
|
- query.setParameter("holding", HostRoleStatus.HOLDING);
|
|
|
- query.setParameter("holding_failed", HostRoleStatus.HOLDING_FAILED);
|
|
|
- query.setParameter("holding_timedout", HostRoleStatus.HOLDING_TIMEDOUT);
|
|
|
- query.setParameter("in_progress", HostRoleStatus.IN_PROGRESS);
|
|
|
- query.setParameter("pending", HostRoleStatus.PENDING);
|
|
|
- query.setParameter("queued", HostRoleStatus.QUEUED);
|
|
|
- query.setParameter("timedout", HostRoleStatus.TIMEDOUT);
|
|
|
- query.setParameter("skipped_failed", HostRoleStatus.SKIPPED_FAILED);
|
|
|
-
|
|
|
- Map<Long, HostRoleCommandStatusSummaryDTO> map = new HashMap<Long, HostRoleCommandStatusSummaryDTO>();
|
|
|
-
|
|
|
- for (HostRoleCommandStatusSummaryDTO dto : daoUtils.selectList(query)) {
|
|
|
- map.put(dto.getStageId(), dto);
|
|
|
- }
|
|
|
-
|
|
|
- return map;
|
|
|
+ if (hostRoleCommandStatusSummaryCacheEnabled)
|
|
|
+ return hrcStatusSummaryCache.getUnchecked(requestId);
|
|
|
+ else
|
|
|
+ return loadAggregateCounts(requestId); // if caching not enabled fall back to fetching through JPA
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* Updates the {@link HostRoleCommandEntity#isFailureAutoSkipped()} flag for
|
|
|
* all commands for the given request.
|