|
@@ -26,8 +26,10 @@ import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.LinkedHashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
|
|
@@ -161,11 +163,29 @@ public class HostRoleCommandDAO {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Invalidates those entries in host role command status cache which are dependent on the passed {@link org.apache.ambari.server.orm.entities.HostRoleCommandEntity}
|
|
|
- * entity.
|
|
|
+ * Invalidates the host role command status summary cache entry that
|
|
|
+ * corresponds to each request.
|
|
|
+ *
|
|
|
+ * @param requestIds
|
|
|
+ * the requests to invalidate
|
|
|
+ */
|
|
|
+ protected void invalidateHostRoleCommandStatusSummaryCache(Set<Long> requestIds) {
|
|
|
+ for (Long requestId : requestIds) {
|
|
|
+ if (null != requestId) {
|
|
|
+ invalidateHostRoleCommandStatusSummaryCache(requestId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Invalidates those entries in host role command status cache which are
|
|
|
+ * dependent on the passed
|
|
|
+ * {@link org.apache.ambari.server.orm.entities.HostRoleCommandEntity} entity.
|
|
|
+ *
|
|
|
* @param hostRoleCommandEntity
|
|
|
*/
|
|
|
- protected void invalidateHostRoleCommandStatusCache(HostRoleCommandEntity hostRoleCommandEntity) {
|
|
|
+ protected void invalidateHostRoleCommandStatusSummaryCache(
|
|
|
+ HostRoleCommandEntity hostRoleCommandEntity) {
|
|
|
if ( !hostRoleCommandStatusSummaryCacheEnabled ) {
|
|
|
return;
|
|
|
}
|
|
@@ -193,36 +213,28 @@ public class HostRoleCommandDAO {
|
|
|
* @return the map of stage-to-summary objects
|
|
|
*/
|
|
|
@RequiresSession
|
|
|
- protected Map<Long, HostRoleCommandStatusSummaryDTO> loadAggregateCounts(Long requestId) {
|
|
|
+ private Map<Long, HostRoleCommandStatusSummaryDTO> loadAggregateCounts(Long requestId) {
|
|
|
Map<Long, HostRoleCommandStatusSummaryDTO> map = new HashMap<Long, HostRoleCommandStatusSummaryDTO>();
|
|
|
|
|
|
- // ensure that we wait for any running transactions working on this cache to
|
|
|
- // complete
|
|
|
- ReadWriteLock lock = transactionLocks.getLock(LockArea.HRC_STATUS_CACHE);
|
|
|
- lock.readLock().lock();
|
|
|
-
|
|
|
- try {
|
|
|
- TypedQuery<HostRoleCommandStatusSummaryDTO> query = entityManagerProvider.get().createQuery(
|
|
|
- SUMMARY_DTO, HostRoleCommandStatusSummaryDTO.class);
|
|
|
-
|
|
|
- query.setParameter("requestId", requestId);
|
|
|
- query.setParameter("aborted", HostRoleStatus.ABORTED);
|
|
|
- query.setParameter("completed", HostRoleStatus.COMPLETED);
|
|
|
- query.setParameter("failed", HostRoleStatus.FAILED);
|
|
|
- query.setParameter("holding", HostRoleStatus.HOLDING);
|
|
|
- query.setParameter("holding_failed", HostRoleStatus.HOLDING_FAILED);
|
|
|
- query.setParameter("holding_timedout", HostRoleStatus.HOLDING_TIMEDOUT);
|
|
|
- query.setParameter("in_progress", HostRoleStatus.IN_PROGRESS);
|
|
|
- query.setParameter("pending", HostRoleStatus.PENDING);
|
|
|
- query.setParameter("queued", HostRoleStatus.QUEUED);
|
|
|
- query.setParameter("timedout", HostRoleStatus.TIMEDOUT);
|
|
|
- query.setParameter("skipped_failed", HostRoleStatus.SKIPPED_FAILED);
|
|
|
-
|
|
|
- for (HostRoleCommandStatusSummaryDTO dto : daoUtils.selectList(query)) {
|
|
|
- map.put(dto.getStageId(), dto);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- lock.readLock().unlock();
|
|
|
+ EntityManager entityManager = entityManagerProvider.get();
|
|
|
+ TypedQuery<HostRoleCommandStatusSummaryDTO> query = entityManager.createQuery(SUMMARY_DTO,
|
|
|
+ HostRoleCommandStatusSummaryDTO.class);
|
|
|
+
|
|
|
+ query.setParameter("requestId", requestId);
|
|
|
+ query.setParameter("aborted", HostRoleStatus.ABORTED);
|
|
|
+ query.setParameter("completed", HostRoleStatus.COMPLETED);
|
|
|
+ query.setParameter("failed", HostRoleStatus.FAILED);
|
|
|
+ query.setParameter("holding", HostRoleStatus.HOLDING);
|
|
|
+ query.setParameter("holding_failed", HostRoleStatus.HOLDING_FAILED);
|
|
|
+ query.setParameter("holding_timedout", HostRoleStatus.HOLDING_TIMEDOUT);
|
|
|
+ query.setParameter("in_progress", HostRoleStatus.IN_PROGRESS);
|
|
|
+ query.setParameter("pending", HostRoleStatus.PENDING);
|
|
|
+ query.setParameter("queued", HostRoleStatus.QUEUED);
|
|
|
+ query.setParameter("timedout", HostRoleStatus.TIMEDOUT);
|
|
|
+ query.setParameter("skipped_failed", HostRoleStatus.SKIPPED_FAILED);
|
|
|
+
|
|
|
+ for (HostRoleCommandStatusSummaryDTO dto : daoUtils.selectList(query)) {
|
|
|
+ map.put(dto.getStageId(), dto);
|
|
|
}
|
|
|
|
|
|
return map;
|
|
@@ -244,9 +256,18 @@ public class HostRoleCommandDAO {
|
|
|
@Override
|
|
|
public Map<Long, HostRoleCommandStatusSummaryDTO> load(Long requestId) throws Exception {
|
|
|
LOG.debug("Cache miss for host role command status summary object for request {}, fetching from JPA", requestId);
|
|
|
- Map<Long, HostRoleCommandStatusSummaryDTO> hrcCommandStatusByStageId = loadAggregateCounts(requestId);
|
|
|
|
|
|
- return hrcCommandStatusByStageId;
|
|
|
+ // ensure that we wait for any running transactions working on this cache to
|
|
|
+ // complete
|
|
|
+ ReadWriteLock lock = transactionLocks.getLock(LockArea.HRC_STATUS_CACHE);
|
|
|
+ lock.readLock().lock();
|
|
|
+
|
|
|
+ try{
|
|
|
+ Map<Long, HostRoleCommandStatusSummaryDTO> hrcCommandStatusByStageId = loadAggregateCounts(requestId);
|
|
|
+ return hrcCommandStatusByStageId;
|
|
|
+ } finally {
|
|
|
+ lock.readLock().unlock();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
}
|
|
@@ -581,7 +602,7 @@ public class HostRoleCommandDAO {
|
|
|
EntityManager entityManager = entityManagerProvider.get();
|
|
|
entityManager.persist(entity);
|
|
|
|
|
|
- invalidateHostRoleCommandStatusCache(entity);
|
|
|
+ invalidateHostRoleCommandStatusSummaryCache(entity);
|
|
|
}
|
|
|
|
|
|
@Transactional
|
|
@@ -590,7 +611,7 @@ public class HostRoleCommandDAO {
|
|
|
EntityManager entityManager = entityManagerProvider.get();
|
|
|
entity = entityManager.merge(entity);
|
|
|
|
|
|
- invalidateHostRoleCommandStatusCache(entity);
|
|
|
+ invalidateHostRoleCommandStatusSummaryCache(entity);
|
|
|
|
|
|
return entity;
|
|
|
}
|
|
@@ -606,13 +627,18 @@ public class HostRoleCommandDAO {
|
|
|
@Transactional
|
|
|
@TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE)
|
|
|
public List<HostRoleCommandEntity> mergeAll(Collection<HostRoleCommandEntity> entities) {
|
|
|
+ Set<Long> requestsToInvalidate = new LinkedHashSet<>();
|
|
|
List<HostRoleCommandEntity> managedList = new ArrayList<HostRoleCommandEntity>(entities.size());
|
|
|
for (HostRoleCommandEntity entity : entities) {
|
|
|
EntityManager entityManager = entityManagerProvider.get();
|
|
|
- managedList.add(entityManager.merge(entity));
|
|
|
- invalidateHostRoleCommandStatusCache(entity);
|
|
|
+ entity = entityManager.merge(entity);
|
|
|
+ managedList.add(entity);
|
|
|
+
|
|
|
+ requestsToInvalidate.add(entity.getRequestId());
|
|
|
}
|
|
|
|
|
|
+ invalidateHostRoleCommandStatusSummaryCache(requestsToInvalidate);
|
|
|
+
|
|
|
return managedList;
|
|
|
}
|
|
|
|
|
@@ -621,7 +647,7 @@ public class HostRoleCommandDAO {
|
|
|
public void remove(HostRoleCommandEntity entity) {
|
|
|
EntityManager entityManager = entityManagerProvider.get();
|
|
|
entityManager.remove(merge(entity));
|
|
|
- invalidateHostRoleCommandStatusCache(entity);
|
|
|
+ invalidateHostRoleCommandStatusSummaryCache(entity);
|
|
|
}
|
|
|
|
|
|
@Transactional
|