|
@@ -18,7 +18,12 @@
|
|
|
package org.apache.ambari.server.actionmanager;
|
|
|
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.google.common.cache.Cache;
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
+import com.google.inject.name.Named;
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.Role;
|
|
|
import org.apache.ambari.server.agent.CommandReport;
|
|
@@ -67,13 +72,20 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
@Inject
|
|
|
private Clusters clusters;
|
|
|
|
|
|
+ private Cache<Long, HostRoleCommand> hostRoleCommandCache;
|
|
|
+ private long cacheLimit; //may be exceeded to store tasks from one request
|
|
|
+
|
|
|
private final long requestId;
|
|
|
|
|
|
@Inject
|
|
|
- public ActionDBAccessorImpl(Injector injector) {
|
|
|
+ public ActionDBAccessorImpl(Injector injector, @Named("executionCommandCacheSize") long cacheLimit) {
|
|
|
injector.injectMembers(this);
|
|
|
requestId = stageDAO.getLastRequestId();
|
|
|
|
|
|
+ this.cacheLimit = cacheLimit;
|
|
|
+ hostRoleCommandCache = CacheBuilder.newBuilder().
|
|
|
+ expireAfterAccess(5, TimeUnit.MINUTES).
|
|
|
+ build();
|
|
|
|
|
|
}
|
|
|
|
|
@@ -105,7 +117,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
List<HostRoleCommandEntity> commands =
|
|
|
hostRoleCommandDAO.findByRequest(requestId);
|
|
|
for (HostRoleCommandEntity command : commands) {
|
|
|
- if(command.getStatus() == HostRoleStatus.QUEUED ||
|
|
|
+ if (command.getStatus() == HostRoleStatus.QUEUED ||
|
|
|
command.getStatus() == HostRoleStatus.IN_PROGRESS ||
|
|
|
command.getStatus() == HostRoleStatus.PENDING) {
|
|
|
command.setStatus(HostRoleStatus.ABORTED);
|
|
@@ -125,7 +137,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
@Override
|
|
|
@Transactional
|
|
|
public void timeoutHostRole(String host, long requestId, long stageId,
|
|
|
- Role role) {
|
|
|
+ Role role) {
|
|
|
List<HostRoleCommandEntity> commands =
|
|
|
hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
|
|
|
for (HostRoleCommandEntity command : commands) {
|
|
@@ -179,7 +191,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
HostEntity hostEntity = hostDAO.findByName(hostRoleCommandEntity.getHostName());
|
|
|
if (hostEntity == null) {
|
|
|
LOG.error("Host {} doesn't exists in database" + hostRoleCommandEntity.getHostName());
|
|
|
- throw new RuntimeException("Host '"+hostRoleCommandEntity.getHostName()+"' doesn't exists in database");
|
|
|
+ throw new RuntimeException("Host '" + hostRoleCommandEntity.getHostName() + "' doesn't exists in database");
|
|
|
}
|
|
|
hostRoleCommandEntity.setHost(hostEntity);
|
|
|
hostRoleCommandDAO.create(hostRoleCommandEntity);
|
|
@@ -208,7 +220,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
@Override
|
|
|
@Transactional
|
|
|
public void updateHostRoleState(String hostname, long requestId,
|
|
|
- long stageId, String role, CommandReport report) {
|
|
|
+ long stageId, String role, CommandReport report) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Update HostRoleState: "
|
|
|
+ "HostName " + hostname + " requestId " + requestId + " stageId "
|
|
@@ -260,10 +272,14 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
@Override
|
|
|
public List<HostRoleCommand> getRequestTasks(long requestId) {
|
|
|
List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
|
|
|
- for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequest(requestId)) {
|
|
|
- tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
|
|
|
- }
|
|
|
- return tasks;
|
|
|
+ return getTasks(
|
|
|
+ hostRoleCommandDAO.findTaskIdsByRequest(requestId)
|
|
|
+ );
|
|
|
+
|
|
|
+// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequest(requestId)) {
|
|
|
+// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
|
|
|
+// }
|
|
|
+// return tasks;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -271,24 +287,31 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
if (requestIds.isEmpty()) {
|
|
|
return Collections.emptyList();
|
|
|
}
|
|
|
- List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
|
|
|
- for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestIds(requestIds)) {
|
|
|
- tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
|
|
|
- }
|
|
|
- return tasks;
|
|
|
+
|
|
|
+ return getTasks(
|
|
|
+ hostRoleCommandDAO.findTaskIdsByRequestIds(requestIds)
|
|
|
+ );
|
|
|
+
|
|
|
+// List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
|
|
|
+// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestIds(requestIds)) {
|
|
|
+// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
|
|
|
+// }
|
|
|
+// return tasks;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public List<HostRoleCommand> getTasksByRequestAndTaskIds(Collection<Long> requestIds, Collection<Long> taskIds) {
|
|
|
if (!requestIds.isEmpty() && !taskIds.isEmpty()) {
|
|
|
- List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
|
|
|
- for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestAndTaskIds(requestIds, taskIds)) {
|
|
|
- tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
|
|
|
- }
|
|
|
- return tasks;
|
|
|
- }else if (requestIds.isEmpty()) {
|
|
|
+ return getTasks(hostRoleCommandDAO.findTaskIdsByRequestAndTaskIds(requestIds, taskIds));
|
|
|
+
|
|
|
+// List<HostRoleCommand> tasks = new ArrayList<HostRoleCommand>();
|
|
|
+// for (HostRoleCommandEntity hostRoleCommandEntity : hostRoleCommandDAO.findByRequestAndTaskIds(requestIds, taskIds)) {
|
|
|
+// tasks.add(hostRoleCommandFactory.createExisting(hostRoleCommandEntity));
|
|
|
+// }
|
|
|
+// return tasks;
|
|
|
+ } else if (requestIds.isEmpty()) {
|
|
|
return getTasks(taskIds);
|
|
|
- }else if (taskIds.isEmpty()) {
|
|
|
+ } else if (taskIds.isEmpty()) {
|
|
|
return getAllTasksByRequestIds(requestIds);
|
|
|
} else {
|
|
|
return Collections.emptyList();
|
|
@@ -300,10 +323,36 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
if (taskIds.isEmpty()) {
|
|
|
return Collections.emptyList();
|
|
|
}
|
|
|
+
|
|
|
List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
|
|
|
- for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(taskIds)) {
|
|
|
- commands.add(hostRoleCommandFactory.createExisting(commandEntity));
|
|
|
+
|
|
|
+ Map<Long, HostRoleCommand> cached = hostRoleCommandCache.getAllPresent(taskIds);
|
|
|
+ commands.addAll(cached.values());
|
|
|
+
|
|
|
+ List<Long> absent = new ArrayList<Long>();
|
|
|
+ absent.addAll(taskIds);
|
|
|
+ absent.removeAll(cached.keySet());
|
|
|
+
|
|
|
+ if (!absent.isEmpty()) {
|
|
|
+ boolean allowStore = hostRoleCommandCache.size() <= cacheLimit;
|
|
|
+// LOG.info("Cache size {}, enable = {}", hostRoleCommandCache.size(), allowStore);
|
|
|
+
|
|
|
+ for (HostRoleCommandEntity commandEntity : hostRoleCommandDAO.findByPKs(absent)) {
|
|
|
+ HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(commandEntity);
|
|
|
+ commands.add(hostRoleCommand);
|
|
|
+ if (allowStore) {
|
|
|
+ switch (hostRoleCommand.getStatus()) {
|
|
|
+ case ABORTED:
|
|
|
+ case COMPLETED:
|
|
|
+ case TIMEDOUT:
|
|
|
+ case FAILED:
|
|
|
+ hostRoleCommandCache.put(hostRoleCommand.getTaskId(), hostRoleCommand);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+
|
|
|
return commands;
|
|
|
}
|
|
|
|
|
@@ -322,7 +371,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
}
|
|
|
|
|
|
public HostRoleCommand getTask(long taskId) {
|
|
|
- HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int)taskId);
|
|
|
+ HostRoleCommandEntity commandEntity = hostRoleCommandDAO.findByPK((int) taskId);
|
|
|
if (commandEntity == null) {
|
|
|
return null;
|
|
|
}
|
|
@@ -335,17 +384,17 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
boolean checkAllTasks = false;
|
|
|
Set<HostRoleStatus> statuses = new HashSet<HostRoleStatus>();
|
|
|
if (status == RequestStatus.IN_PROGRESS) {
|
|
|
- statuses.addAll( Arrays.asList(HostRoleStatus.PENDING,
|
|
|
+ statuses.addAll(Arrays.asList(HostRoleStatus.PENDING,
|
|
|
HostRoleStatus.IN_PROGRESS, HostRoleStatus.QUEUED));
|
|
|
} else if (status == RequestStatus.COMPLETED) {
|
|
|
match = false;
|
|
|
checkAllTasks = true;
|
|
|
- statuses.addAll( Arrays.asList(HostRoleStatus.PENDING,
|
|
|
+ statuses.addAll(Arrays.asList(HostRoleStatus.PENDING,
|
|
|
HostRoleStatus.IN_PROGRESS, HostRoleStatus.QUEUED,
|
|
|
HostRoleStatus.ABORTED, HostRoleStatus.FAILED,
|
|
|
HostRoleStatus.TIMEDOUT));
|
|
|
} else if (status == RequestStatus.FAILED) {
|
|
|
- statuses.addAll( Arrays.asList(HostRoleStatus.ABORTED,
|
|
|
+ statuses.addAll(Arrays.asList(HostRoleStatus.ABORTED,
|
|
|
HostRoleStatus.FAILED, HostRoleStatus.TIMEDOUT));
|
|
|
}
|
|
|
return hostRoleCommandDAO.getRequestsByTaskStatus(statuses, match, checkAllTasks);
|