|
@@ -94,11 +94,7 @@ public class HeartBeatHandler {
|
|
|
if (currentResponseId == null) {
|
|
|
//Server restarted, or unknown host.
|
|
|
LOG.error("CurrentResponseId unknown - send register command");
|
|
|
- response = new HeartBeatResponse();
|
|
|
- RegistrationCommand regCmd = new RegistrationCommand();
|
|
|
- response.setResponseId(0);
|
|
|
- response.setRegistrationCommand(regCmd);
|
|
|
- return response;
|
|
|
+ return createRegisterCommand();
|
|
|
}
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -113,10 +109,7 @@ public class HeartBeatHandler {
|
|
|
return hostResponses.get(hostname);
|
|
|
}else if (heartbeat.getResponseId() != currentResponseId) {
|
|
|
LOG.error("Error in responseId sequence - sending agent restart command");
|
|
|
- response = new HeartBeatResponse();
|
|
|
- response.setRestartAgent(true);
|
|
|
- response.setResponseId(currentResponseId);
|
|
|
- return response;
|
|
|
+ return createRestartCommand(currentResponseId);
|
|
|
}
|
|
|
|
|
|
response = new HeartBeatResponse();
|
|
@@ -127,11 +120,7 @@ public class HeartBeatHandler {
|
|
|
if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) {
|
|
|
// After loosing heartbeat agent should reregister
|
|
|
LOG.warn("Host is in HEARTBEAT_LOST state - sending register command");
|
|
|
- response = new HeartBeatResponse();
|
|
|
- RegistrationCommand regCmd = new RegistrationCommand();
|
|
|
- response.setResponseId(0);
|
|
|
- response.setRegistrationCommand(regCmd);
|
|
|
- return response;
|
|
|
+ return createRegisterCommand();
|
|
|
}
|
|
|
|
|
|
hostResponseIds.put(hostname, currentResponseId);
|
|
@@ -161,18 +150,28 @@ public class HeartBeatHandler {
|
|
|
} catch (InvalidStateTransitionException ex) {
|
|
|
LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
|
|
|
hostObject.setState(HostState.INIT);
|
|
|
- RegistrationCommand regCmd = new RegistrationCommand();
|
|
|
- response.setRegistrationCommand(regCmd);
|
|
|
- return response;
|
|
|
+ return createRegisterCommand();
|
|
|
}
|
|
|
|
|
|
//Examine heartbeat for command reports
|
|
|
+ processCommandReports(heartbeat, hostname, clusterFsm, now);
|
|
|
+
|
|
|
+ // Examine heartbeart for component live status reports
|
|
|
+ processStatusReports(heartbeat, hostname, clusterFsm);
|
|
|
+
|
|
|
+ // Send commands if node is active
|
|
|
+ if (hostObject.getState().equals(HostState.HEALTHY)) {
|
|
|
+ sendCommands(hostname, response);
|
|
|
+ }
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void processCommandReports(HeartBeat heartbeat,
|
|
|
+ String hostname,
|
|
|
+ Clusters clusterFsm, long now) throws
|
|
|
+ AmbariException {
|
|
|
List<CommandReport> reports = heartbeat.getReports();
|
|
|
for (CommandReport report : reports) {
|
|
|
- String clusterName = report.getClusterName();
|
|
|
- if ((clusterName == null) || "".equals(clusterName)) {
|
|
|
- clusterName = "cluster1";
|
|
|
- }
|
|
|
Cluster cl = clusterFsm.getCluster(report.getClusterName());
|
|
|
String service = report.getServiceName();
|
|
|
if (service == null || "".equals(service)) {
|
|
@@ -185,18 +184,23 @@ public class HeartBeatHandler {
|
|
|
Service svc = cl.getService(service);
|
|
|
ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
|
|
|
ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
|
|
|
+ String schName = scHost.getServiceComponentName();
|
|
|
if (report.getStatus().equals("COMPLETED")) {
|
|
|
- scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(scHost
|
|
|
- .getServiceComponentName(), hostname, now));
|
|
|
+ // Updating stack version, if needed
|
|
|
+ if (scHost.getState().equals(State.UPGRADING)) {
|
|
|
+ scHost.setStackVersion(scHost.getDesiredStackVersion());
|
|
|
+ }
|
|
|
+ scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
|
|
|
+ hostname, now));
|
|
|
} else if (report.getStatus().equals("FAILED")) {
|
|
|
- scHost.handleEvent(new ServiceComponentHostOpFailedEvent(scHost
|
|
|
- .getServiceComponentName(), hostname, now));
|
|
|
+ scHost.handleEvent(new ServiceComponentHostOpFailedEvent(schName,
|
|
|
+ hostname, now));
|
|
|
} else if (report.getStatus().equals("IN_PROGRESS")) {
|
|
|
- scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(scHost
|
|
|
- .getServiceComponentName(), hostname, now));
|
|
|
+ scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
|
|
|
+ hostname, now));
|
|
|
}
|
|
|
} catch (ServiceComponentNotFoundException scnex) {
|
|
|
- LOG.info("Service component not found ", scnex);
|
|
|
+ LOG.warn("Service component not found ", scnex);
|
|
|
} catch (InvalidStateTransitionException ex) {
|
|
|
LOG.warn("State machine exception", ex);
|
|
|
}
|
|
@@ -204,8 +208,12 @@ public class HeartBeatHandler {
|
|
|
}
|
|
|
//Update state machines from reports
|
|
|
actionManager.processTaskResponse(hostname, reports);
|
|
|
+ }
|
|
|
|
|
|
- // Examine heartbeart for component live status reports
|
|
|
+ protected void processStatusReports(HeartBeat heartbeat,
|
|
|
+ String hostname,
|
|
|
+ Clusters clusterFsm) throws
|
|
|
+ AmbariException {
|
|
|
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
|
|
|
for (Cluster cl : clusters) {
|
|
|
for (ComponentStatus status : heartbeat.componentStatus) {
|
|
@@ -221,16 +229,16 @@ public class HeartBeatHandler {
|
|
|
State prevState = scHost.getState();
|
|
|
State liveState = State.valueOf(State.class, status.getStatus());
|
|
|
if (prevState.equals(State.INSTALLED)
|
|
|
- || prevState.equals(State.START_FAILED)
|
|
|
- || prevState.equals(State.STARTED)
|
|
|
- || prevState.equals(State.STOP_FAILED)) {
|
|
|
+ || prevState.equals(State.START_FAILED)
|
|
|
+ || prevState.equals(State.STARTED)
|
|
|
+ || prevState.equals(State.STOP_FAILED)) {
|
|
|
scHost.setState(liveState);
|
|
|
if (!prevState.equals(liveState)) {
|
|
|
LOG.info("State of service component " + componentName
|
|
|
- + " of service " + status.getServiceName()
|
|
|
- + " of cluster " + status.getClusterName()
|
|
|
- + " has changed from " + prevState + " to " + liveState
|
|
|
- + " at host " + hostname);
|
|
|
+ + " of service " + status.getServiceName()
|
|
|
+ + " of cluster " + status.getClusterName()
|
|
|
+ + " has changed from " + prevState + " to " + liveState
|
|
|
+ + " at host " + hostname);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -246,73 +254,76 @@ public class HeartBeatHandler {
|
|
|
}
|
|
|
catch (ServiceNotFoundException e) {
|
|
|
LOG.warn("Received a live status update for a non-initialized"
|
|
|
- + " service"
|
|
|
- + ", clusterName=" + status.getClusterName()
|
|
|
- + ", serviceName=" + status.getServiceName());
|
|
|
+ + " service"
|
|
|
+ + ", clusterName=" + status.getClusterName()
|
|
|
+ + ", serviceName=" + status.getServiceName());
|
|
|
// FIXME ignore invalid live update and continue for now?
|
|
|
continue;
|
|
|
}
|
|
|
catch (ServiceComponentNotFoundException e) {
|
|
|
LOG.warn("Received a live status update for a non-initialized"
|
|
|
- + " servicecomponent"
|
|
|
- + ", clusterName=" + status.getClusterName()
|
|
|
- + ", serviceName=" + status.getServiceName()
|
|
|
- + ", componentName=" + status.getComponentName());
|
|
|
+ + " servicecomponent"
|
|
|
+ + ", clusterName=" + status.getClusterName()
|
|
|
+ + ", serviceName=" + status.getServiceName()
|
|
|
+ + ", componentName=" + status.getComponentName());
|
|
|
// FIXME ignore invalid live update and continue for now?
|
|
|
continue;
|
|
|
}
|
|
|
catch (ServiceComponentHostNotFoundException e) {
|
|
|
LOG.warn("Received a live status update for a non-initialized"
|
|
|
- + " service"
|
|
|
- + ", clusterName=" + status.getClusterName()
|
|
|
- + ", serviceName=" + status.getServiceName()
|
|
|
- + ", componentName=" + status.getComponentName()
|
|
|
- + ", hostname=" + hostname);
|
|
|
+ + " service"
|
|
|
+ + ", clusterName=" + status.getClusterName()
|
|
|
+ + ", serviceName=" + status.getServiceName()
|
|
|
+ + ", componentName=" + status.getComponentName()
|
|
|
+ + ", hostname=" + hostname);
|
|
|
// FIXME ignore invalid live update and continue for now?
|
|
|
continue;
|
|
|
}
|
|
|
catch (RuntimeException e) {
|
|
|
LOG.warn("Received a live status with invalid payload"
|
|
|
- + " service"
|
|
|
- + ", clusterName=" + status.getClusterName()
|
|
|
- + ", serviceName=" + status.getServiceName()
|
|
|
- + ", componentName=" + status.getComponentName()
|
|
|
- + ", hostname=" + hostname
|
|
|
- + ", error=" + e.getMessage());
|
|
|
+ + " service"
|
|
|
+ + ", clusterName=" + status.getClusterName()
|
|
|
+ + ", serviceName=" + status.getServiceName()
|
|
|
+ + ", componentName=" + status.getComponentName()
|
|
|
+ + ", hostname=" + hostname
|
|
|
+ + ", error=" + e.getMessage());
|
|
|
continue;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // Send commands if node is active
|
|
|
- if (hostObject.getState().equals(HostState.HEALTHY)) {
|
|
|
- List<AgentCommand> cmds = actionQueue.dequeueAll(heartbeat.getHostname());
|
|
|
- if (cmds != null && !cmds.isEmpty()) {
|
|
|
- for (AgentCommand ac : cmds) {
|
|
|
- try {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- throw new AmbariException("Could not get jaxb string for command", e);
|
|
|
+ /**
|
|
|
+ * Adds commands from action queue to a heartbeat responce
|
|
|
+ */
|
|
|
+ protected void sendCommands(String hostname, HeartBeatResponse response)
|
|
|
+ throws AmbariException {
|
|
|
+ List<AgentCommand> cmds = actionQueue.dequeueAll(hostname);
|
|
|
+ if (cmds != null && !cmds.isEmpty()) {
|
|
|
+ for (AgentCommand ac : cmds) {
|
|
|
+ try {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
|
|
|
}
|
|
|
- switch (ac.getCommandType()) {
|
|
|
- case EXECUTION_COMMAND: {
|
|
|
- response.addExecutionCommand((ExecutionCommand) ac);
|
|
|
- break;
|
|
|
- }
|
|
|
- case STATUS_COMMAND: {
|
|
|
- response.addStatusCommand((StatusCommand) ac);
|
|
|
- break;
|
|
|
- }
|
|
|
- default:
|
|
|
- LOG.error("There is no action for agent command ="+ ac.getCommandType().name() );
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new AmbariException("Could not get jaxb string for command", e);
|
|
|
+ }
|
|
|
+ switch (ac.getCommandType()) {
|
|
|
+ case EXECUTION_COMMAND: {
|
|
|
+ response.addExecutionCommand((ExecutionCommand) ac);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case STATUS_COMMAND: {
|
|
|
+ response.addStatusCommand((StatusCommand) ac);
|
|
|
+ break;
|
|
|
}
|
|
|
+ default:
|
|
|
+ LOG.error("There is no action for agent command ="+
|
|
|
+ ac.getCommandType().name() );
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return response;
|
|
|
}
|
|
|
|
|
|
public String getOsType(String os, String osRelease) {
|
|
@@ -329,6 +340,21 @@ public class HeartBeatHandler {
|
|
|
return osType.toLowerCase();
|
|
|
}
|
|
|
|
|
|
+ protected HeartBeatResponse createRegisterCommand() {
|
|
|
+ HeartBeatResponse response = new HeartBeatResponse();
|
|
|
+ RegistrationCommand regCmd = new RegistrationCommand();
|
|
|
+ response.setResponseId(0);
|
|
|
+ response.setRegistrationCommand(regCmd);
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected HeartBeatResponse createRestartCommand(Long currentResponseId) {
|
|
|
+ HeartBeatResponse response = new HeartBeatResponse();
|
|
|
+ response.setRestartAgent(true);
|
|
|
+ response.setResponseId(currentResponseId);
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+
|
|
|
public RegistrationResponse handleRegistration(Register register)
|
|
|
throws InvalidStateTransitionException, AmbariException {
|
|
|
String hostname = register.getHostname();
|