|
@@ -17,30 +17,22 @@
|
|
|
*/
|
|
|
package org.apache.ambari.server.actionmanager;
|
|
|
|
|
|
-import java.net.InetAddress;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.TreeMap;
|
|
|
|
|
|
-import javax.xml.bind.JAXBException;
|
|
|
-
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
|
-import org.apache.ambari.server.Role;
|
|
|
import org.apache.ambari.server.agent.ActionQueue;
|
|
|
-import org.apache.ambari.server.agent.AgentCommand;
|
|
|
+import org.apache.ambari.server.agent.ExecutionCommand;
|
|
|
import org.apache.ambari.server.state.Cluster;
|
|
|
import org.apache.ambari.server.state.Clusters;
|
|
|
import org.apache.ambari.server.state.Service;
|
|
|
import org.apache.ambari.server.state.ServiceComponent;
|
|
|
import org.apache.ambari.server.state.ServiceComponentHost;
|
|
|
-import org.apache.ambari.server.state.ServiceComponentImpl;
|
|
|
-import org.apache.ambari.server.state.ServiceImpl;
|
|
|
import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
|
|
|
-
|
|
|
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostImpl;
|
|
|
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
|
|
|
import org.apache.ambari.server.utils.StageUtils;
|
|
|
-
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
@@ -94,123 +86,173 @@ class ActionScheduler implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void doWork() throws AmbariException {
|
|
|
- LOG.info("Scheduler wakes up");
|
|
|
List<Stage> stages = db.getStagesInProgress();
|
|
|
+ LOG.info("Scheduler wakes up, number of stages to look at:"+stages.size());
|
|
|
if (stages == null || stages.isEmpty()) {
|
|
|
//Nothing to do
|
|
|
LOG.info("No stage in progress..nothing to do");
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- //First discover completions and timeouts.
|
|
|
- boolean operationFailure = false;
|
|
|
+
|
|
|
for (Stage s : stages) {
|
|
|
- Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = getInvertedRoleMap(s);
|
|
|
-
|
|
|
- //Iterate for completion
|
|
|
- boolean moveToNextStage = true;
|
|
|
- for (Role r: roleToHrcMap.keySet()) {
|
|
|
- processPendingsAndReschedule(s, roleToHrcMap.get(r));
|
|
|
- RoleStatus roleStatus = getRoleStatus(roleToHrcMap.get(r), s.getSuccessFactor(r));
|
|
|
- if (!roleStatus.isRoleSuccessful()) {
|
|
|
- if (!roleStatus.isRoleInProgress()) {
|
|
|
- //The role has completely failed
|
|
|
- //Mark the entire operation as failed
|
|
|
- operationFailure = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- moveToNextStage = false;
|
|
|
+ List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>();
|
|
|
+ Map<String, RoleStats> roleStats = processInProgressStage(s, commandsToSchedule);
|
|
|
+ //Check if stage is failed
|
|
|
+ boolean failed = false;
|
|
|
+ for (String role : roleStats.keySet()) {
|
|
|
+ RoleStats stats = roleStats.get(role);
|
|
|
+ LOG.info("Stats for role:"+role+", stats="+stats);
|
|
|
+ if (stats.isRoleFailed()) {
|
|
|
+ failed = true;
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
- if (operationFailure) {
|
|
|
+ if (failed) {
|
|
|
+ LOG.warn("Operation completely failed, borting request id:"
|
|
|
+ + s.getRequestId());
|
|
|
db.abortOperation(s.getRequestId());
|
|
|
+ return;
|
|
|
}
|
|
|
- if (operationFailure || !moveToNextStage) {
|
|
|
- break;
|
|
|
+
|
|
|
+ //Schedule what we have so far
|
|
|
+ for (ExecutionCommand cmd : commandsToSchedule) {
|
|
|
+ try {
|
|
|
+ scheduleHostRole(s, cmd);
|
|
|
+ } catch (InvalidStateTransitonException e) {
|
|
|
+ LOG.warn("Could not schedule host role "+cmd.toString(), e);
|
|
|
+ db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(),
|
|
|
+ cmd.getRole());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //Check if ready to go to next stage
|
|
|
+ boolean goToNextStage = true;
|
|
|
+ for (String role: roleStats.keySet()) {
|
|
|
+ RoleStats stats = roleStats.get(role);
|
|
|
+ if (!stats.isSuccessFactorMet()) {
|
|
|
+ goToNextStage = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!goToNextStage) {
|
|
|
+ return;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private void processPendingsAndReschedule(Stage stage,
|
|
|
- Map<String, HostRoleCommand> hrcMap) throws AmbariException {
|
|
|
- LOG.info("Processing pending and queued actions");
|
|
|
- for (String host : hrcMap.keySet()) {
|
|
|
- HostRoleCommand hrc = hrcMap.get(host);
|
|
|
- if ( (hrc.getStatus() != HostRoleStatus.PENDING) &&
|
|
|
- (hrc.getStatus() != HostRoleStatus.QUEUED) ) {
|
|
|
- //This task has been executed
|
|
|
- continue;
|
|
|
- }
|
|
|
- long now = System.currentTimeMillis();
|
|
|
- LOG.info("Last attempt time =" + stage.getLastAttemptTime(host)
|
|
|
- + ", actiontimeout =" + this.actionTimeout + ", current time=" + now);
|
|
|
- if (now > stage.getLastAttemptTime(host)+actionTimeout) {
|
|
|
- LOG.info("Host:"+host+", role:"+hrc.getRole()+", actionId:"+stage.getActionId()+" timed out");
|
|
|
- if (stage.getAttemptCount(host) >= maxAttempts) {
|
|
|
- LOG.warn("Host:"+host+", role:"+hrc.getRole()+", actionId:"+stage.getActionId()+" expired");
|
|
|
- // final expired
|
|
|
- ServiceComponentHostOpFailedEvent timeoutEvent =
|
|
|
- new ServiceComponentHostOpFailedEvent(hrc.getRole().toString(),
|
|
|
- host, now);
|
|
|
- try {
|
|
|
- Cluster c = fsmObject.getCluster(stage.getClusterName());
|
|
|
- Service svc = c.getService(hrc.getServiceName());
|
|
|
- ServiceComponent svcComp = svc.getServiceComponent(
|
|
|
- hrc.getRole().toString());
|
|
|
- ServiceComponentHost svcCompHost =
|
|
|
- svcComp.getServiceComponentHost(host);
|
|
|
- svcCompHost.handleEvent(timeoutEvent);
|
|
|
- } catch (InvalidStateTransitonException e) {
|
|
|
- LOG.info("Transition failed for host: "+host+", role: "+hrc.getRole(), e);
|
|
|
- }
|
|
|
- db.timeoutHostRole(host, stage.getRequestId(), stage.getStageId(),
|
|
|
- hrc.getRole());
|
|
|
- } else {
|
|
|
- try {
|
|
|
- scheduleHostRole(stage, host, hrc);
|
|
|
- } catch (InvalidStateTransitonException ex) {
|
|
|
- LOG.info("Cannot make this transition..aborting host role", ex);
|
|
|
- db.abortHostRole(host, stage.getRequestId(), stage.getStageId(),
|
|
|
- hrc.getRole());
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param commandsToSchedule
|
|
|
+ * @return Stats for the roles in the stage. It is used to determine whether stage
|
|
|
+ * has succeeded or failed.
|
|
|
+ */
|
|
|
+ private Map<String, RoleStats> processInProgressStage(Stage s,
|
|
|
+ List<ExecutionCommand> commandsToSchedule) {
|
|
|
+ // Map to track role status
|
|
|
+ Map<String, RoleStats> roleStats = new TreeMap<String, RoleStats>();
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ for (String host : s.getHosts()) {
|
|
|
+ List<ExecutionCommand> commands = s.getExecutionCommands(host);
|
|
|
+ for(ExecutionCommand c : commands) {
|
|
|
+ String roleStr = c.getRole().toString();
|
|
|
+ RoleStats stats = roleStats.get(roleStr);
|
|
|
+ if (stats == null) {
|
|
|
+ stats = new RoleStats(s.getHosts().size(), 1);
|
|
|
+ roleStats.put(roleStr, stats);
|
|
|
+ }
|
|
|
+ HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
|
|
|
+ LOG.info("Last attempt time =" + s.getLastAttemptTime(host, roleStr)
|
|
|
+ + ", actiontimeout =" + this.actionTimeout + ", current time="
|
|
|
+ + now);
|
|
|
+ if (timeOutActionNeeded(status, s, host, roleStr, now)) {
|
|
|
+ LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
|
|
|
+ + s.getActionId() + " timed out");
|
|
|
+ if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
|
|
|
+ LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
|
|
|
+ + s.getActionId() + " expired");
|
|
|
+ db.timeoutHostRole(host, s.getRequestId(), s.getStageId(),
|
|
|
+ c.getRole());
|
|
|
+ //Reinitialize status
|
|
|
+ status = s.getHostRoleStatus(host, roleStr);
|
|
|
+ ServiceComponentHostOpFailedEvent timeoutEvent =
|
|
|
+ new ServiceComponentHostOpFailedEvent(roleStr,
|
|
|
+ host, now);
|
|
|
+ try {
|
|
|
+ Cluster cluster = fsmObject.getCluster(s.getClusterName());
|
|
|
+ Service svc = cluster.getService(c.getServiceName());
|
|
|
+ ServiceComponent svcComp = svc.getServiceComponent(
|
|
|
+ roleStr);
|
|
|
+ ServiceComponentHost svcCompHost =
|
|
|
+ svcComp.getServiceComponentHost(host);
|
|
|
+ svcCompHost.handleEvent(timeoutEvent);
|
|
|
+ } catch (InvalidStateTransitonException e) {
|
|
|
+ LOG.info("Transition failed for host: "+host+", role: "+roleStr, e);
|
|
|
+ } catch (AmbariException ex) {
|
|
|
+ LOG.info("Invalid live state", ex);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ commandsToSchedule.add(c);
|
|
|
}
|
|
|
+ } else if (status.equals(HostRoleStatus.PENDING)) {
|
|
|
+ //Need to schedule first time
|
|
|
+ commandsToSchedule.add(c);
|
|
|
}
|
|
|
+ this.updateRoleStats(status, stats);
|
|
|
}
|
|
|
}
|
|
|
+ return roleStats;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean timeOutActionNeeded(HostRoleStatus status, Stage stage,
|
|
|
+ String host, String role, long currentTime) {
|
|
|
+ LOG.info("Last attempt time =" + stage.getLastAttemptTime(host, role)
|
|
|
+ + ", actiontimeout =" + this.actionTimeout + ", current time="
|
|
|
+ + currentTime+", role="+role+", status="+status);
|
|
|
+
|
|
|
+ if (( !status.equals(HostRoleStatus.QUEUED) ) &&
|
|
|
+ ( ! status.equals(HostRoleStatus.IN_PROGRESS) )) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (currentTime > stage.getLastAttemptTime(host, role)+actionTimeout) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
- private void scheduleHostRole(Stage s, String hostname, HostRoleCommand hrc)
|
|
|
+ private void scheduleHostRole(Stage s, ExecutionCommand cmd)
|
|
|
throws InvalidStateTransitonException, AmbariException {
|
|
|
long now = System.currentTimeMillis();
|
|
|
- LOG.info("Host:" + hostname + ", role:" + hrc.getRole() + ", actionId:"
|
|
|
+ String roleStr = cmd.getRole().toString();
|
|
|
+ String hostname = cmd.getHostname();
|
|
|
+ LOG.info("Host:" + hostname + ", role:" + cmd.getRole() + ", actionId:"
|
|
|
+ s.getActionId() + " being scheduled"+", current time: "+now+", start time: "+
|
|
|
- s.getStartTime(hostname));
|
|
|
- if (s.getStartTime(hostname) < 0) {
|
|
|
+ s.getStartTime(hostname, roleStr));
|
|
|
+ if (s.getStartTime(hostname, roleStr) < 0) {
|
|
|
LOG.info("Update state machine for first attempt");
|
|
|
try {
|
|
|
Cluster c = fsmObject.getCluster(s.getClusterName());
|
|
|
- Service svc = c.getService(hrc.getServiceName());
|
|
|
- ServiceComponent svcComp = svc.getServiceComponent(hrc.getRole().toString());
|
|
|
+ Service svc = c.getService(cmd.getServiceName());
|
|
|
+ ServiceComponent svcComp = svc.getServiceComponent(roleStr);
|
|
|
ServiceComponentHost svcCompHost =
|
|
|
svcComp.getServiceComponentHost(hostname);
|
|
|
- svcCompHost.handleEvent(hrc.getEvent());
|
|
|
- s.setStartTime(hostname, now);
|
|
|
+ svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr));
|
|
|
+ s.setStartTime(hostname,roleStr, now);
|
|
|
+ s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
|
|
|
} catch (InvalidStateTransitonException e) {
|
|
|
LOG.info(
|
|
|
"Transition failed for host: " + hostname + ", role: "
|
|
|
- + hrc.getRole(), e);
|
|
|
+ + roleStr, e);
|
|
|
throw e;
|
|
|
} catch (AmbariException e) {
|
|
|
- LOG.info("Exception in fsm: " + hostname + ", role: " + hrc.getRole(),
|
|
|
+ LOG.info("Exception in fsm: " + hostname + ", role: " + roleStr,
|
|
|
e);
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
|
- s.setLastAttemptTime(hostname, now);
|
|
|
- s.incrementAttemptCount(hostname);
|
|
|
+ s.setLastAttemptTime(hostname, roleStr, now);
|
|
|
+ s.incrementAttemptCount(hostname, roleStr);
|
|
|
LOG.info("Enqueueing in action queue for host: "+hostname);
|
|
|
- AgentCommand cmd = s.getExecutionCommand(hostname);
|
|
|
try {
|
|
|
LOG.info("Command string = " + StageUtils.jaxbToString(cmd));
|
|
|
} catch (Exception e) {
|
|
@@ -219,54 +261,29 @@ class ActionScheduler implements Runnable {
|
|
|
actionQueue.enqueue(hostname, cmd);
|
|
|
}
|
|
|
|
|
|
- private RoleStatus getRoleStatus(
|
|
|
- Map<String, HostRoleCommand> hostRoleCmdForRole, float successFactor) {
|
|
|
- RoleStatus rs = new RoleStatus(hostRoleCmdForRole.size(), successFactor);
|
|
|
- for (String h : hostRoleCmdForRole.keySet()) {
|
|
|
- HostRoleCommand hrc = hostRoleCmdForRole.get(h);
|
|
|
- switch (hrc.getStatus()) {
|
|
|
- case COMPLETED:
|
|
|
- rs.numSucceeded++;
|
|
|
- break;
|
|
|
- case FAILED:
|
|
|
- rs.numFailed++;
|
|
|
- break;
|
|
|
- case QUEUED:
|
|
|
- rs.numQueued++;
|
|
|
- break;
|
|
|
- case PENDING:
|
|
|
- rs.numPending++;
|
|
|
- break;
|
|
|
- case TIMEDOUT:
|
|
|
- rs.numTimedOut++;
|
|
|
- break;
|
|
|
- case ABORTED:
|
|
|
- rs.numAborted++;
|
|
|
- }
|
|
|
+ private void updateRoleStats(HostRoleStatus status, RoleStats rs) {
|
|
|
+ switch (status) {
|
|
|
+ case COMPLETED:
|
|
|
+ rs.numSucceeded++;
|
|
|
+ break;
|
|
|
+ case FAILED:
|
|
|
+ rs.numFailed++;
|
|
|
+ break;
|
|
|
+ case QUEUED:
|
|
|
+ rs.numQueued++;
|
|
|
+ break;
|
|
|
+ case PENDING:
|
|
|
+ rs.numPending++;
|
|
|
+ break;
|
|
|
+ case TIMEDOUT:
|
|
|
+ rs.numTimedOut++;
|
|
|
+ break;
|
|
|
+ case ABORTED:
|
|
|
+ rs.numAborted++;
|
|
|
}
|
|
|
- return rs;
|
|
|
}
|
|
|
|
|
|
- private Map<Role, Map<String, HostRoleCommand>> getInvertedRoleMap(Stage s) {
|
|
|
- // Temporary to store role to host
|
|
|
- Map<Role, Map<String, HostRoleCommand>> roleToHrcMap = new TreeMap<Role, Map<String, HostRoleCommand>>();
|
|
|
- Map<String, HostAction> hostActions = s.getHostActions();
|
|
|
- for (String h : hostActions.keySet()) {
|
|
|
- HostAction ha = hostActions.get(h);
|
|
|
- List<HostRoleCommand> roleCommands = ha.getRoleCommands();
|
|
|
- for (HostRoleCommand hrc : roleCommands) {
|
|
|
- Map<String, HostRoleCommand> hrcMap = roleToHrcMap.get(hrc.getRole());
|
|
|
- if (hrcMap == null) {
|
|
|
- hrcMap = new TreeMap<String, HostRoleCommand>();
|
|
|
- roleToHrcMap.put(hrc.getRole(), hrcMap);
|
|
|
- }
|
|
|
- hrcMap.put(h, hrc);
|
|
|
- }
|
|
|
- }
|
|
|
- return roleToHrcMap;
|
|
|
- }
|
|
|
-
|
|
|
- static class RoleStatus {
|
|
|
+ static class RoleStats {
|
|
|
int numQueued = 0;
|
|
|
int numSucceeded = 0;
|
|
|
int numFailed = 0;
|
|
@@ -276,12 +293,15 @@ class ActionScheduler implements Runnable {
|
|
|
final int totalHosts;
|
|
|
final float successFactor;
|
|
|
|
|
|
- RoleStatus(int total, float successFactor) {
|
|
|
+ RoleStats(int total, float successFactor) {
|
|
|
this.totalHosts = total;
|
|
|
this.successFactor = successFactor;
|
|
|
}
|
|
|
|
|
|
- boolean isRoleSuccessful() {
|
|
|
+ /**
|
|
|
+ * Role successful means the role is successful enough to
|
|
|
+ */
|
|
|
+ boolean isSuccessFactorMet() {
|
|
|
if (successFactor <= (1.0*numSucceeded)/totalHosts) {
|
|
|
return true;
|
|
|
} else {
|
|
@@ -289,16 +309,33 @@ class ActionScheduler implements Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- boolean isRoleInProgress() {
|
|
|
+ private boolean isRoleInProgress() {
|
|
|
return (numPending+numQueued > 0);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Role failure means role is no longer in progress and success factor is
|
|
|
+ * not met.
|
|
|
+ */
|
|
|
boolean isRoleFailed() {
|
|
|
- if ((!isRoleInProgress()) && (!isRoleSuccessful())) {
|
|
|
+ if (isRoleInProgress() || isSuccessFactorMet()) {
|
|
|
return false;
|
|
|
} else {
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public String toString() {
|
|
|
+ StringBuilder builder = new StringBuilder();
|
|
|
+ builder.append("numQueued="+numQueued);
|
|
|
+ builder.append(", numSucceeded="+numSucceeded);
|
|
|
+ builder.append(", numFailed="+numFailed);
|
|
|
+ builder.append(", numTimedOut="+numTimedOut);
|
|
|
+ builder.append(", numPending="+numPending);
|
|
|
+ builder.append(", numAborted="+numAborted);
|
|
|
+ builder.append(", totalHosts="+totalHosts);
|
|
|
+ builder.append(", successFactor="+successFactor);
|
|
|
+ return builder.toString();
|
|
|
+ }
|
|
|
}
|
|
|
}
|