|
@@ -35,6 +35,8 @@ import org.apache.ambari.server.state.host.HostUnhealthyHeartbeatEvent;
|
|
|
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
|
|
|
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
|
|
|
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpSucceededEvent;
|
|
|
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartedEvent;
|
|
|
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEvent;
|
|
|
import org.apache.ambari.server.utils.StageUtils;
|
|
|
import org.apache.ambari.server.utils.VersionUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -57,8 +59,6 @@ public class HeartBeatHandler {
|
|
|
private final Clusters clusterFsm;
|
|
|
private final ActionQueue actionQueue;
|
|
|
private final ActionManager actionManager;
|
|
|
- private HeartbeatMonitor heartbeatMonitor;
|
|
|
-
|
|
|
@Inject
|
|
|
Injector injector;
|
|
|
@Inject
|
|
@@ -69,15 +69,15 @@ public class HeartBeatHandler {
|
|
|
ActionMetadata actionMetadata;
|
|
|
@Inject
|
|
|
HBaseMasterPortScanner scanner;
|
|
|
+ private HeartbeatMonitor heartbeatMonitor;
|
|
|
@Inject
|
|
|
private Gson gson;
|
|
|
-
|
|
|
private Map<String, Long> hostResponseIds = new HashMap<String, Long>();
|
|
|
private Map<String, HeartBeatResponse> hostResponses = new HashMap<String, HeartBeatResponse>();
|
|
|
|
|
|
@Inject
|
|
|
public HeartBeatHandler(Clusters fsm, ActionQueue aq, ActionManager am,
|
|
|
- Injector injector) {
|
|
|
+ Injector injector) {
|
|
|
this.clusterFsm = fsm;
|
|
|
this.actionQueue = aq;
|
|
|
this.actionManager = am;
|
|
@@ -107,7 +107,7 @@ public class HeartBeatHandler {
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Received heartbeat from host"
|
|
|
- + ", hostname=" + hostname
|
|
|
+ + ", hostname=" + hostname
|
|
|
+ ", currentResponseId=" + currentResponseId
|
|
|
+ ", receivedResponseId=" + heartbeat.getResponseId());
|
|
|
}
|
|
@@ -115,7 +115,7 @@ public class HeartBeatHandler {
|
|
|
if (heartbeat.getResponseId() == currentResponseId - 1) {
|
|
|
LOG.warn("Old responseId received - response was lost - returning cached response");
|
|
|
return hostResponses.get(hostname);
|
|
|
- }else if (heartbeat.getResponseId() != currentResponseId) {
|
|
|
+ } else if (heartbeat.getResponseId() != currentResponseId) {
|
|
|
LOG.error("Error in responseId sequence - sending agent restart command");
|
|
|
return createRestartCommand(currentResponseId);
|
|
|
}
|
|
@@ -138,7 +138,7 @@ public class HeartBeatHandler {
|
|
|
HostState hostState = hostObject.getState();
|
|
|
// If the host is waiting for component status updates, notify it
|
|
|
if (heartbeat.componentStatus.size() > 0
|
|
|
- && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
|
|
|
+ && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {
|
|
|
try {
|
|
|
LOG.debug("Got component status updates");
|
|
|
hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now));
|
|
@@ -155,9 +155,9 @@ public class HeartBeatHandler {
|
|
|
hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now,
|
|
|
null));
|
|
|
}
|
|
|
- if(hostState != hostObject.getState()) scanner.updateHBaseMaster(hostObject);
|
|
|
+ if (hostState != hostObject.getState()) scanner.updateHBaseMaster(hostObject);
|
|
|
} catch (InvalidStateTransitionException ex) {
|
|
|
- LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
|
|
|
+ LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
|
|
|
hostObject.setState(HostState.INIT);
|
|
|
return createRegisterCommand();
|
|
|
}
|
|
@@ -177,8 +177,8 @@ public class HeartBeatHandler {
|
|
|
|
|
|
protected void processCommandReports(HeartBeat heartbeat,
|
|
|
String hostname,
|
|
|
- Clusters clusterFsm, long now) throws
|
|
|
- AmbariException {
|
|
|
+ Clusters clusterFsm, long now)
|
|
|
+ throws AmbariException {
|
|
|
List<CommandReport> reports = heartbeat.getReports();
|
|
|
for (CommandReport report : reports) {
|
|
|
Cluster cl = clusterFsm.getCluster(report.getClusterName());
|
|
@@ -190,35 +190,45 @@ public class HeartBeatHandler {
|
|
|
LOG.info(report.getRole() + " is an action - skip component lookup");
|
|
|
} else {
|
|
|
try {
|
|
|
- if (null != report.getConfigTags() && !report.getConfigTags().isEmpty()) {
|
|
|
- cl.updateActualConfigs(hostname, report.getConfigTags());
|
|
|
+ if (null != report.getConfigurationTags() && !report.getConfigurationTags().isEmpty()) {
|
|
|
+ cl.updateActualConfigs(hostname, report.getConfigurationTags());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
Service svc = cl.getService(service);
|
|
|
ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
|
|
|
ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
|
|
|
String schName = scHost.getServiceComponentName();
|
|
|
State state = scHost.getState();
|
|
|
-
|
|
|
+
|
|
|
if (report.getStatus().equals("COMPLETED")) {
|
|
|
// Updating stack version, if needed
|
|
|
if (scHost.getState().equals(State.UPGRADING)) {
|
|
|
scHost.setStackVersion(scHost.getDesiredStackVersion());
|
|
|
+ } else if (report.getRoleCommand().equals(RoleCommand.START.toString())
|
|
|
+ && null != report.getConfigurationTags()) {
|
|
|
+ LOG.info("Updating applied config on service " + scHost.getServiceName() +
|
|
|
+ ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
|
|
|
+ scHost.updateActualConfigs(report.getConfigurationTags());
|
|
|
}
|
|
|
- else if (scHost.getState().equals(State.STARTING) && null != report.getConfigTags()) {
|
|
|
- scHost.updateActualConfigs(report.getConfigTags());
|
|
|
+
|
|
|
+ if (RoleCommand.START.toString().equals(report.getRoleCommand())) {
|
|
|
+ scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
|
|
|
+ hostname, now));
|
|
|
+ } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand())) {
|
|
|
+ scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
|
|
|
+ hostname, now));
|
|
|
+ } else {
|
|
|
+ scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
|
|
|
+ hostname, now));
|
|
|
}
|
|
|
-
|
|
|
- scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
|
|
|
- hostname, now));
|
|
|
} else if (report.getStatus().equals("FAILED")) {
|
|
|
scHost.handleEvent(new ServiceComponentHostOpFailedEvent(schName,
|
|
|
- hostname, now));
|
|
|
+ hostname, now));
|
|
|
} else if (report.getStatus().equals("IN_PROGRESS")) {
|
|
|
scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
|
|
|
- hostname, now));
|
|
|
+ hostname, now));
|
|
|
}
|
|
|
- if(state != scHost.getState() && schName.equals(Role.HBASE_MASTER.toString())) {
|
|
|
+ if (state != scHost.getState() && schName.equals(Role.HBASE_MASTER.toString())) {
|
|
|
scanner.updateHBaseMaster(cl);
|
|
|
}
|
|
|
} catch (ServiceComponentNotFoundException scnex) {
|
|
@@ -234,8 +244,8 @@ public class HeartBeatHandler {
|
|
|
|
|
|
protected void processStatusReports(HeartBeat heartbeat,
|
|
|
String hostname,
|
|
|
- Clusters clusterFsm) throws
|
|
|
- AmbariException {
|
|
|
+ Clusters clusterFsm)
|
|
|
+ throws AmbariException {
|
|
|
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
|
|
|
for (Cluster cl : clusters) {
|
|
|
for (ComponentStatus status : heartbeat.componentStatus) {
|
|
@@ -272,9 +282,10 @@ public class HeartBeatHandler {
|
|
|
if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
|
|
|
scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (null != status.getConfigTags()) {
|
|
|
scHost.updateActualConfigs(status.getConfigTags());
|
|
|
+ cl.updateActualConfigs(hostname, status.getConfigTags());
|
|
|
}
|
|
|
|
|
|
} else {
|
|
@@ -323,7 +334,7 @@ public class HeartBeatHandler {
|
|
|
* Adds commands from action queue to a heartbeat responce
|
|
|
*/
|
|
|
protected void sendCommands(String hostname, HeartBeatResponse response)
|
|
|
- throws AmbariException {
|
|
|
+ throws AmbariException {
|
|
|
List<AgentCommand> cmds = actionQueue.dequeueAll(hostname);
|
|
|
if (cmds != null && !cmds.isEmpty()) {
|
|
|
for (AgentCommand ac : cmds) {
|
|
@@ -344,8 +355,8 @@ public class HeartBeatHandler {
|
|
|
break;
|
|
|
}
|
|
|
default:
|
|
|
- LOG.error("There is no action for agent command ="+
|
|
|
- ac.getCommandType().name() );
|
|
|
+ LOG.error("There is no action for agent command =" +
|
|
|
+ ac.getCommandType().name());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -381,7 +392,7 @@ public class HeartBeatHandler {
|
|
|
}
|
|
|
|
|
|
public RegistrationResponse handleRegistration(Register register)
|
|
|
- throws InvalidStateTransitionException, AmbariException {
|
|
|
+ throws InvalidStateTransitionException, AmbariException {
|
|
|
String hostname = register.getHostname();
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
@@ -389,15 +400,15 @@ public class HeartBeatHandler {
|
|
|
String serverVersion = ambariMetaInfo.getServerVersion();
|
|
|
if (!VersionUtils.areVersionsCompatible(serverVersion, agentVersion)) {
|
|
|
LOG.warn("Received registration request from host with non compatible"
|
|
|
- + " agent version"
|
|
|
- + ", hostname=" + hostname
|
|
|
- + ", agentVersion=" + agentVersion
|
|
|
- + ", serverVersion=" + serverVersion);
|
|
|
+ + " agent version"
|
|
|
+ + ", hostname=" + hostname
|
|
|
+ + ", agentVersion=" + agentVersion
|
|
|
+ + ", serverVersion=" + serverVersion);
|
|
|
throw new AmbariException("Cannot register host with non compatible"
|
|
|
- + " agent version"
|
|
|
- + ", hostname=" + hostname
|
|
|
- + ", agentVersion=" + agentVersion
|
|
|
- + ", serverVersion=" + serverVersion);
|
|
|
+ + " agent version"
|
|
|
+ + ", hostname=" + hostname
|
|
|
+ + ", agentVersion=" + agentVersion
|
|
|
+ + ", serverVersion=" + serverVersion);
|
|
|
}
|
|
|
|
|
|
String agentOsType = getOsType(register.getHardwareProfile().getOS(),
|