|
@@ -20,21 +20,41 @@ package org.apache.ambari.server.actionmanager;
|
|
|
import com.google.common.cache.Cache;
|
|
|
import com.google.common.cache.CacheBuilder;
|
|
|
import com.google.inject.Inject;
|
|
|
-import com.google.inject.Injector;
|
|
|
import com.google.inject.Singleton;
|
|
|
import com.google.inject.name.Named;
|
|
|
import com.google.inject.persist.Transactional;
|
|
|
-import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.agent.CommandReport;
|
|
|
-import org.apache.ambari.server.controller.ExecuteActionRequest;
|
|
|
-import org.apache.ambari.server.orm.dao.*;
|
|
|
-import org.apache.ambari.server.orm.entities.*;
|
|
|
-import org.apache.ambari.server.state.Cluster;
|
|
|
+import org.apache.ambari.server.orm.dao.ClusterDAO;
|
|
|
+import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
|
|
|
+import org.apache.ambari.server.orm.dao.HostDAO;
|
|
|
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
|
|
|
+import org.apache.ambari.server.orm.dao.RequestDAO;
|
|
|
+import org.apache.ambari.server.orm.dao.RequestScheduleDAO;
|
|
|
+import org.apache.ambari.server.orm.dao.RoleSuccessCriteriaDAO;
|
|
|
+import org.apache.ambari.server.orm.dao.StageDAO;
|
|
|
+import org.apache.ambari.server.orm.entities.ClusterEntity;
|
|
|
+import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
|
|
|
+import org.apache.ambari.server.orm.entities.HostEntity;
|
|
|
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
|
|
|
+import org.apache.ambari.server.orm.entities.RequestEntity;
|
|
|
+import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
|
|
|
+import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
|
|
|
+import org.apache.ambari.server.orm.entities.StageEntity;
|
|
|
import org.apache.ambari.server.state.Clusters;
|
|
|
+import org.apache.ambari.server.utils.StageUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.util.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@Singleton
|
|
@@ -107,20 +127,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
return stages;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Request getRequest(long requestId) {
|
|
|
+ RequestEntity requestEntity = requestDAO.findByPK(requestId);
|
|
|
+ if (requestEntity != null) {
|
|
|
+ return requestFactory.createExisting(requestEntity);
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/* (non-Javadoc)
|
|
|
* @see org.apache.ambari.server.actionmanager.ActionDBAccessor#abortOperation(long)
|
|
|
*/
|
|
|
@Override
|
|
|
- @Transactional
|
|
|
public void abortOperation(long requestId) {
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
|
- //mark request as ended
|
|
|
- RequestEntity requestEntity = requestDAO.findByPK(requestId);
|
|
|
- if (requestEntity != null && requestEntity.getEndTime() == -1L) {
|
|
|
- requestEntity.setEndTime(now);
|
|
|
- requestDAO.merge(requestEntity);
|
|
|
- }
|
|
|
+ endRequest(requestId);
|
|
|
|
|
|
List<HostRoleCommandEntity> commands =
|
|
|
hostRoleCommandDAO.findByRequest(requestId);
|
|
@@ -130,7 +154,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
command.getStatus() == HostRoleStatus.PENDING) {
|
|
|
command.setStatus(HostRoleStatus.ABORTED);
|
|
|
command.setEndTime(now);
|
|
|
- hostRoleCommandDAO.merge(command);
|
|
|
LOG.info("Aborting command. Hostname " + command.getHostName()
|
|
|
+ " role " + command.getRole()
|
|
|
+ " requestId " + command.getRequestId()
|
|
@@ -138,13 +161,14 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
+ " stageId " + command.getStageId());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ hostRoleCommandDAO.mergeAll(commands);
|
|
|
}
|
|
|
|
|
|
/* (non-Javadoc)
|
|
|
* @see org.apache.ambari.server.actionmanager.ActionDBAccessor#timeoutHostRole(long, long, org.apache.ambari.server.Role)
|
|
|
*/
|
|
|
@Override
|
|
|
- @Transactional
|
|
|
public void timeoutHostRole(String host, long requestId, long stageId,
|
|
|
String role) {
|
|
|
long now = System.currentTimeMillis();
|
|
@@ -153,8 +177,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
for (HostRoleCommandEntity command : commands) {
|
|
|
command.setStatus(HostRoleStatus.TIMEDOUT);
|
|
|
command.setEndTime(now);
|
|
|
- hostRoleCommandDAO.merge(command);
|
|
|
}
|
|
|
+ hostRoleCommandDAO.mergeAll(commands);
|
|
|
+ endRequestIfCompleted(requestId);
|
|
|
}
|
|
|
|
|
|
/* (non-Javadoc)
|
|
@@ -165,7 +190,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
List<Stage> stages = new ArrayList<Stage>();
|
|
|
List<HostRoleStatus> statuses =
|
|
|
Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS,
|
|
|
- HostRoleStatus.PENDING);
|
|
|
+ HostRoleStatus.PENDING);
|
|
|
for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
|
|
|
stages.add(stageFactory.createExisting(stageEntity));
|
|
|
}
|
|
@@ -234,27 +259,30 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
}
|
|
|
requestEntity.setStages(stageEntities);
|
|
|
requestDAO.merge(requestEntity);
|
|
|
-// requestDAO.create(requestEntity);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- @Transactional
|
|
|
public void startRequest(long requestId) {
|
|
|
RequestEntity requestEntity = requestDAO.findByPK(requestId);
|
|
|
if (requestEntity != null && requestEntity.getStartTime() == -1L) {
|
|
|
requestEntity.setStartTime(System.currentTimeMillis());
|
|
|
+ requestDAO.merge(requestEntity);
|
|
|
}
|
|
|
- requestDAO.merge(requestEntity);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- @Transactional
|
|
|
public void endRequest(long requestId) {
|
|
|
RequestEntity requestEntity = requestDAO.findByPK(requestId);
|
|
|
if (requestEntity != null && requestEntity.getEndTime() == -1L) {
|
|
|
requestEntity.setEndTime(System.currentTimeMillis());
|
|
|
+ requestDAO.merge(requestEntity);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void endRequestIfCompleted(long requestId) {
|
|
|
+ if (requestDAO.isAllTasksCompleted(requestId)) {
|
|
|
+ endRequest(requestId);
|
|
|
}
|
|
|
- requestDAO.merge(requestEntity);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -285,30 +313,76 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- @Transactional
|
|
|
+ public void updateHostRoleStates(Collection<CommandReport> reports) {
|
|
|
+ Map<Long, CommandReport> taskReports = new HashMap<Long, CommandReport>();
|
|
|
+ for (CommandReport report : reports) {
|
|
|
+ taskReports.put(report.getTaskId(), report);
|
|
|
+ }
|
|
|
+
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+
|
|
|
+ List<Long> requestsToCheck = new ArrayList<Long>();
|
|
|
+
|
|
|
+ List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet());
|
|
|
+ for (HostRoleCommandEntity commandEntity : commandEntities) {
|
|
|
+ CommandReport report = taskReports.get(commandEntity.getTaskId());
|
|
|
+ commandEntity.setStatus(HostRoleStatus.valueOf(report.getStatus()));
|
|
|
+ commandEntity.setStdOut(report.getStdOut().getBytes());
|
|
|
+ commandEntity.setStdError(report.getStdErr().getBytes());
|
|
|
+ commandEntity.setStructuredOut(report.getStructuredOut() == null ? null :
|
|
|
+ report.getStructuredOut().getBytes());
|
|
|
+ commandEntity.setExitcode(report.getExitCode());
|
|
|
+
|
|
|
+ if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) {
|
|
|
+ commandEntity.setEndTime(now);
|
|
|
+
|
|
|
+ String actionId = report.getActionId();
|
|
|
+ long[] requestStageIds = StageUtils.getRequestStage(actionId);
|
|
|
+ long requestId = requestStageIds[0];
|
|
|
+ long stageId = requestStageIds[1];
|
|
|
+ if (requestDAO.getLastStageId(requestId).equals(stageId)) {
|
|
|
+ requestsToCheck.add(requestId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ hostRoleCommandDAO.mergeAll(commandEntities);
|
|
|
+
|
|
|
+ for (Long requestId : requestsToCheck) {
|
|
|
+ endRequestIfCompleted(requestId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
public void updateHostRoleState(String hostname, long requestId,
|
|
|
long stageId, String role, CommandReport report) {
|
|
|
+ boolean checkRequest = false;
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Update HostRoleState: "
|
|
|
- + "HostName " + hostname + " requestId " + requestId + " stageId "
|
|
|
- + stageId + " role " + role + " report " + report);
|
|
|
+ + "HostName " + hostname + " requestId " + requestId + " stageId "
|
|
|
+ + stageId + " role " + role + " report " + report);
|
|
|
}
|
|
|
long now = System.currentTimeMillis();
|
|
|
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(
|
|
|
- hostname, requestId, stageId, role);
|
|
|
+ hostname, requestId, stageId, role);
|
|
|
for (HostRoleCommandEntity command : commands) {
|
|
|
command.setStatus(HostRoleStatus.valueOf(report.getStatus()));
|
|
|
command.setStdOut(report.getStdOut().getBytes());
|
|
|
command.setStdError(report.getStdErr().getBytes());
|
|
|
command.setStructuredOut(report.getStructuredOut() == null ? null :
|
|
|
- report.getStructuredOut().getBytes()); // ===================================
|
|
|
- if (command.getStatus() == HostRoleStatus.COMPLETED ||
|
|
|
- command.getStatus() == HostRoleStatus.ABORTED ||
|
|
|
- command.getStatus() == HostRoleStatus.FAILED) {
|
|
|
+ report.getStructuredOut().getBytes());
|
|
|
+ if (HostRoleStatus.getCompletedStates().contains(command.getStatus())) {
|
|
|
command.setEndTime(now);
|
|
|
+ if (requestDAO.getLastStageId(requestId).equals(stageId)) {
|
|
|
+ checkRequest = true;
|
|
|
+ }
|
|
|
}
|
|
|
command.setExitcode(report.getExitCode());
|
|
|
- hostRoleCommandDAO.merge(command);
|
|
|
+ }
|
|
|
+ hostRoleCommandDAO.mergeAll(commands);
|
|
|
+
|
|
|
+ if (checkRequest) {
|
|
|
+ endRequestIfCompleted(requestId);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -473,7 +547,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- @Transactional
|
|
|
public List<Request> getRequests(Collection<Long> requestIds){
|
|
|
List<RequestEntity> requestEntities = requestDAO.findByPks(requestIds);
|
|
|
List<Request> requests = new ArrayList<Request>(requestEntities.size());
|