|
@@ -465,10 +465,6 @@ class ActionScheduler implements Runnable {
|
|
|
// Map to track role status
|
|
|
Map<String, RoleStats> roleStats = initRoleStats(s);
|
|
|
long now = System.currentTimeMillis();
|
|
|
- long taskTimeout = actionTimeout;
|
|
|
- if (taskTimeoutAdjustment) {
|
|
|
- taskTimeout = actionTimeout + s.getStageTimeout();
|
|
|
- }
|
|
|
|
|
|
Cluster cluster = null;
|
|
|
if (null != s.getClusterName()) {
|
|
@@ -522,6 +518,20 @@ class ActionScheduler implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ //basic timeout for stage
|
|
|
+ long commandTimeout = actionTimeout;
|
|
|
+ if (taskTimeoutAdjustment) {
|
|
|
+ Map<String, String> commandParams = c.getCommandParams();
|
|
|
+ String timeoutKey = ExecutionCommand.KeyNames.COMMAND_TIMEOUT;
|
|
|
+ if (commandParams != null && commandParams.containsKey(timeoutKey)) {
|
|
|
+ String timeoutStr = commandParams.get(timeoutKey);
|
|
|
+ commandTimeout += Long.parseLong(timeoutStr) * 1000; // Converting to milliseconds
|
|
|
+ } else {
|
|
|
+ LOG.error("Execution command has no timeout parameter" +
|
|
|
+ c.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Check that service host component is not deleted
|
|
|
if (hostDeleted) {
|
|
|
|
|
@@ -537,7 +547,7 @@ class ActionScheduler implements Runnable {
|
|
|
// We don't need to send CANCEL_COMMANDs in this case
|
|
|
db.abortHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), message);
|
|
|
status = HostRoleStatus.ABORTED;
|
|
|
- } else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, taskTimeout)) {
|
|
|
+ } else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)) {
|
|
|
// Process command timeouts
|
|
|
LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:" + s.getActionId() + " timed out");
|
|
|
if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
|
|
@@ -677,6 +687,11 @@ class ActionScheduler implements Runnable {
|
|
|
LOG.debug("Timing out action since agent is not heartbeating.");
|
|
|
return true;
|
|
|
}
|
|
|
+ // If we have other command in progress for this stage do not timeout this one
|
|
|
+ if (hasCommandInProgress(stage, host.getHostName())
|
|
|
+ && !status.equals(HostRoleStatus.IN_PROGRESS)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
if (currentTime > stage.getLastAttemptTime(host.getHostName(), role)
|
|
|
+ taskTimeout) {
|
|
|
return true;
|
|
@@ -684,6 +699,19 @@ class ActionScheduler implements Runnable {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ private boolean hasCommandInProgress(Stage stage, String host) {
|
|
|
+ List<ExecutionCommandWrapper> commandWrappers = stage.getExecutionCommands(host);
|
|
|
+ for (ExecutionCommandWrapper wrapper : commandWrappers) {
|
|
|
+ ExecutionCommand c = wrapper.getExecutionCommand();
|
|
|
+ String roleStr = c.getRole();
|
|
|
+ HostRoleStatus status = stage.getHostRoleStatus(host, roleStr);
|
|
|
+ if (status == HostRoleStatus.IN_PROGRESS) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
private ListMultimap<String, ServiceComponentHostEvent> formEventMap(Stage s, List<ExecutionCommand> commands) {
|
|
|
ListMultimap<String, ServiceComponentHostEvent> serviceEventMap = ArrayListMultimap.create();
|
|
|
for (ExecutionCommand cmd : commands) {
|