|
|
@@ -29,6 +29,7 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import javax.persistence.EntityManager;
|
|
|
import javax.persistence.TypedQuery;
|
|
|
@@ -62,6 +63,9 @@ import org.apache.ambari.server.orm.entities.HostEntity;
|
|
|
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
|
|
|
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity_;
|
|
|
import org.apache.ambari.server.orm.entities.StageEntity;
|
|
|
+import org.apache.ambari.server.orm.helpers.SQLConstants;
|
|
|
+import org.apache.ambari.server.orm.helpers.SQLOperations;
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.eclipse.persistence.config.HintValues;
|
|
|
import org.eclipse.persistence.config.QueryHints;
|
|
|
import org.slf4j.Logger;
|
|
|
@@ -289,27 +293,48 @@ public class HostRoleCommandDAO {
|
|
|
|
|
|
@RequiresSession
|
|
|
public List<HostRoleCommandEntity> findByPKs(Collection<Long> taskIds) {
|
|
|
- if (taskIds == null || taskIds.isEmpty()) {
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
-
|
|
|
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery(
|
|
|
"SELECT task FROM HostRoleCommandEntity task WHERE task.taskId IN ?1 " +
|
|
|
"ORDER BY task.taskId",
|
|
|
HostRoleCommandEntity.class);
|
|
|
|
|
|
- if (taskIds.size() > configuration.getTaskIdListLimit()) {
|
|
|
- List<HostRoleCommandEntity> result = new ArrayList<>();
|
|
|
-
|
|
|
- List<List<Long>> lists = Lists.partition(new ArrayList<>(taskIds), configuration.getTaskIdListLimit());
|
|
|
- for (List<Long> list : lists) {
|
|
|
- result.addAll(daoUtils.selectList(query, list));
|
|
|
- }
|
|
|
+ List<HostRoleCommandEntity> result = new ArrayList<>();
|
|
|
+ SQLOperations.batch(taskIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
|
|
|
+ result.addAll(daoUtils.selectList(query, chunk));
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
|
|
|
- return result;
|
|
|
- }
|
|
|
+ return Lists.newArrayList(result);
|
|
|
+ }
|
|
|
|
|
|
- return daoUtils.selectList(query, taskIds);
|
|
|
+ /**
|
|
|
+ * Retrieves minimal host role command columns which are required to calculate stare state.
|
|
|
+ * @param taskIds collection of host role commands to process.
|
|
|
+ * @return minimized host role command entities.
|
|
|
+ */
|
|
|
+ @RequiresSession
|
|
|
+ public List<HostRoleCommandEntity> findStatusRolesByPKs(Collection<Long> taskIds) {
|
|
|
+ TypedQuery<Object[]> query = entityManagerProvider.get().createQuery(
|
|
|
+ "SELECT task.taskId, task.status, task.role FROM HostRoleCommandEntity task WHERE task.taskId IN ?1 " +
|
|
|
+ "ORDER BY task.taskId",
|
|
|
+ Object[].class);
|
|
|
+
|
|
|
+ List<HostRoleCommandEntity> result = new ArrayList<>();
|
|
|
+ SQLOperations.batch(taskIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
|
|
|
+ List<Object[]> queryResult = daoUtils.selectList(query, chunk);
|
|
|
+ result.addAll(queryResult.stream().map(
|
|
|
+ o -> {
|
|
|
+ HostRoleCommandEntity e = new HostRoleCommandEntity();
|
|
|
+ e.setTaskId((Long) o[0]);
|
|
|
+ e.setStatus(HostRoleStatus.valueOf(o[1].toString()));
|
|
|
+ e.setRole(Role.valueOf(o[2].toString()));
|
|
|
+ return e;
|
|
|
+ }).collect(Collectors.toList()));
|
|
|
+
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+
|
|
|
+ return Lists.newArrayList(result);
|
|
|
}
|
|
|
|
|
|
@RequiresSession
|
|
|
@@ -328,7 +353,14 @@ public class HostRoleCommandDAO {
|
|
|
"SELECT task FROM HostRoleCommandEntity task " +
|
|
|
"WHERE task.requestId IN ?1 " +
|
|
|
"ORDER BY task.taskId", HostRoleCommandEntity.class);
|
|
|
- return daoUtils.selectList(query, requestIds);
|
|
|
+
|
|
|
+ List<HostRoleCommandEntity> result = new ArrayList<>();
|
|
|
+ SQLOperations.batch(requestIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
|
|
|
+ result.addAll(daoUtils.selectList(query, chunk));
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+
|
|
|
+ return Lists.newArrayList(result);
|
|
|
}
|
|
|
|
|
|
@RequiresSession
|
|
|
@@ -347,38 +379,58 @@ public class HostRoleCommandDAO {
|
|
|
"SELECT task.taskId FROM HostRoleCommandEntity task " +
|
|
|
"WHERE task.requestId IN ?1 " +
|
|
|
"ORDER BY task.taskId", Long.class);
|
|
|
- return daoUtils.selectList(query, requestIds);
|
|
|
+
|
|
|
+ List<Long> result = new ArrayList<>();
|
|
|
+ SQLOperations.batch(requestIds, SQLConstants.IN_ARGUMENT_MAX_SIZE, (chunk, currentBatch, totalBatches, totalSize) -> {
|
|
|
+ result.addAll(daoUtils.selectList(query, chunk));
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+
|
|
|
+ return Lists.newArrayList(result);
|
|
|
}
|
|
|
|
|
|
@RequiresSession
|
|
|
public List<HostRoleCommandEntity> findByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds) {
|
|
|
+ if (CollectionUtils.isEmpty(requestIds) || CollectionUtils.isEmpty(taskIds)) {
|
|
|
+ return Collections.<HostRoleCommandEntity>emptyList();
|
|
|
+ }
|
|
|
+
|
|
|
TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery(
|
|
|
"SELECT DISTINCT task FROM HostRoleCommandEntity task " +
|
|
|
"WHERE task.requestId IN ?1 AND task.taskId IN ?2 " +
|
|
|
"ORDER BY task.taskId", HostRoleCommandEntity.class
|
|
|
);
|
|
|
- return daoUtils.selectList(query, requestIds, taskIds);
|
|
|
+
|
|
|
+ return runQueryForVastRequestsAndTaskIds(query, requestIds, taskIds);
|
|
|
}
|
|
|
|
|
|
@RequiresSession
|
|
|
public List<Long> findTaskIdsByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds) {
|
|
|
+ if (CollectionUtils.isEmpty(requestIds) || CollectionUtils.isEmpty(taskIds)) {
|
|
|
+ return Collections.<Long>emptyList();
|
|
|
+ }
|
|
|
+
|
|
|
TypedQuery<Long> query = entityManagerProvider.get().createQuery(
|
|
|
"SELECT DISTINCT task.taskId FROM HostRoleCommandEntity task " +
|
|
|
"WHERE task.requestId IN ?1 AND task.taskId IN ?2 " +
|
|
|
"ORDER BY task.taskId", Long.class
|
|
|
);
|
|
|
|
|
|
- if (taskIds.size() > configuration.getTaskIdListLimit()) {
|
|
|
- List<Long> result = new ArrayList<>();
|
|
|
-
|
|
|
- List<List<Long>> lists = Lists.partition(new ArrayList<>(taskIds), configuration.getTaskIdListLimit());
|
|
|
- for (List<Long> taskIdList : lists) {
|
|
|
- result.addAll(daoUtils.selectList(query, requestIds, taskIdList));
|
|
|
- }
|
|
|
+ return runQueryForVastRequestsAndTaskIds(query, requestIds, taskIds);
|
|
|
+ }
|
|
|
|
|
|
- return result;
|
|
|
- }
|
|
|
- return daoUtils.selectList(query, requestIds, taskIds);
|
|
|
+ private <T> List<T> runQueryForVastRequestsAndTaskIds(TypedQuery<T> query, Collection<Long> requestIds, Collection<Long> taskIds) {
|
|
|
+ final int batchSize = SQLConstants.IN_ARGUMENT_MAX_SIZE;
|
|
|
+ final List<T> result = new ArrayList<>();
|
|
|
+ SQLOperations.batch(taskIds, batchSize, (taskChunk, currentTaskBatch, totalTaskBatches, totalTaskSize) -> {
|
|
|
+ SQLOperations.batch(requestIds, batchSize, (requestChunk, currentRequestBatch, totalRequestBatches, totalRequestSize) -> {
|
|
|
+ result.addAll(daoUtils.selectList(query, requestChunk, taskChunk));
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+ return 0;
|
|
|
+ });
|
|
|
+
|
|
|
+ return Lists.newArrayList(result);
|
|
|
}
|
|
|
|
|
|
@RequiresSession
|