|
@@ -27,21 +27,14 @@ import com.google.inject.Injector;
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.actionmanager.ActionManager;
|
|
|
import org.apache.ambari.server.api.services.AmbariMetaInfo;
|
|
|
-import org.apache.ambari.server.state.Cluster;
|
|
|
-import org.apache.ambari.server.state.Clusters;
|
|
|
-import org.apache.ambari.server.state.Config;
|
|
|
-import org.apache.ambari.server.state.ConfigHelper;
|
|
|
-import org.apache.ambari.server.state.Host;
|
|
|
-import org.apache.ambari.server.state.HostState;
|
|
|
-import org.apache.ambari.server.state.Service;
|
|
|
-import org.apache.ambari.server.state.ServiceComponent;
|
|
|
-import org.apache.ambari.server.state.ServiceComponentHost;
|
|
|
-import org.apache.ambari.server.state.State;
|
|
|
+import org.apache.ambari.server.state.*;
|
|
|
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
|
|
|
import org.apache.ambari.server.state.host.HostHeartbeatLostEvent;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
+import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.*;
|
|
|
+
|
|
|
/**
|
|
|
* Monitors the node state and heartbeats.
|
|
|
*/
|
|
@@ -54,6 +47,7 @@ public class HeartbeatMonitor implements Runnable {
|
|
|
private volatile boolean shouldRun = true;
|
|
|
private Thread monitorThread = null;
|
|
|
private final ConfigHelper configHelper;
|
|
|
+ private final AmbariMetaInfo ambariMetaInfo;
|
|
|
|
|
|
public HeartbeatMonitor(Clusters clusters, ActionQueue aq, ActionManager am,
|
|
|
int threadWakeupInterval, Injector injector) {
|
|
@@ -62,6 +56,7 @@ public class HeartbeatMonitor implements Runnable {
|
|
|
this.actionManager = am;
|
|
|
this.threadWakeupInterval = threadWakeupInterval;
|
|
|
this.configHelper = injector.getInstance(ConfigHelper.class);
|
|
|
+ this.ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
|
|
|
}
|
|
|
|
|
|
public void shutdown() {
|
|
@@ -176,58 +171,98 @@ public class HeartbeatMonitor implements Runnable {
|
|
|
|
|
|
for (Cluster cl : clusters.getClustersForHost(hostname)) {
|
|
|
for (ServiceComponentHost sch : cl.getServiceComponentHosts(hostname)) {
|
|
|
- String serviceName = sch.getServiceName();
|
|
|
- Service service = cl.getService(sch.getServiceName());
|
|
|
- ServiceComponent sc = service.getServiceComponent(sch
|
|
|
- .getServiceComponentName());
|
|
|
- // Send status commands for any components
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Live status will include status of service " + serviceName + " of cluster " + cl.getClusterName());
|
|
|
- }
|
|
|
+ StatusCommand statusCmd = createStatusCommand(hostname, cl, sch);
|
|
|
+ cmds.add(statusCmd);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return cmds;
|
|
|
+ }
|
|
|
|
|
|
- Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
|
|
|
|
|
|
- // get the cluster config for type 'global'
|
|
|
- // apply config group overrides
|
|
|
+ /**
|
|
|
+ * Generates status command and fills all apropriate fields.
|
|
|
+ * @throws AmbariException
|
|
|
+ */
|
|
|
+ private StatusCommand createStatusCommand(String hostname, Cluster cluster,
|
|
|
+ ServiceComponentHost sch) throws AmbariException {
|
|
|
+ String serviceName = sch.getServiceName();
|
|
|
+ String componentName = sch.getServiceComponentName();
|
|
|
+ Service service = cluster.getService(sch.getServiceName());
|
|
|
+ ServiceComponent sc = service.getServiceComponent(componentName);
|
|
|
+ StackId stackId = cluster.getDesiredStackVersion();
|
|
|
+ ServiceInfo serviceInfo = ambariMetaInfo.getServiceInfo(stackId.getStackName(),
|
|
|
+ stackId.getStackVersion(), serviceName);
|
|
|
+ ComponentInfo componentInfo = ambariMetaInfo.getComponent(
|
|
|
+ stackId.getStackName(), stackId.getStackVersion(),
|
|
|
+ serviceName, componentName);
|
|
|
|
|
|
- Config clusterConfig = cl.getDesiredConfigByType("global");
|
|
|
- if (clusterConfig != null) {
|
|
|
- // cluster config for 'global'
|
|
|
- Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
|
|
|
+ Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
|
|
|
|
|
|
- // Apply global properties for this host from all config groups
|
|
|
- Map<String, Map<String, String>> allConfigTags = configHelper
|
|
|
- .getEffectiveDesiredTags(cl, hostname);
|
|
|
+ // get the cluster config for type 'global'
|
|
|
+ // apply config group overrides
|
|
|
|
|
|
- Map<String, Map<String, String>> configTags = new HashMap<String,
|
|
|
- Map<String, String>>();
|
|
|
+ Config clusterConfig = cluster.getDesiredConfigByType(GLOBAL);
|
|
|
+ if (clusterConfig != null) {
|
|
|
+ // cluster config for 'global'
|
|
|
+ Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
|
|
|
|
|
|
- for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) {
|
|
|
- if (entry.getKey().equals("global")) {
|
|
|
- configTags.put("global", entry.getValue());
|
|
|
- }
|
|
|
- }
|
|
|
+ // Apply global properties for this host from all config groups
|
|
|
+ Map<String, Map<String, String>> allConfigTags = configHelper
|
|
|
+ .getEffectiveDesiredTags(cluster, hostname);
|
|
|
|
|
|
- Map<String, Map<String, String>> properties = configHelper
|
|
|
- .getEffectiveConfigProperties(cl, configTags);
|
|
|
+ Map<String, Map<String, String>> configTags = new HashMap<String,
|
|
|
+ Map<String, String>>();
|
|
|
|
|
|
- if (!properties.isEmpty()) {
|
|
|
- for (Map<String, String> propertyMap : properties.values()) {
|
|
|
- props.putAll(propertyMap);
|
|
|
- }
|
|
|
- }
|
|
|
+ for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) {
|
|
|
+ if (entry.getKey().equals(GLOBAL)) {
|
|
|
+ configTags.put(GLOBAL, entry.getValue());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- configurations.put("global", props);
|
|
|
+ Map<String, Map<String, String>> properties = configHelper
|
|
|
+ .getEffectiveConfigProperties(cluster, configTags);
|
|
|
+
|
|
|
+ if (!properties.isEmpty()) {
|
|
|
+ for (Map<String, String> propertyMap : properties.values()) {
|
|
|
+ props.putAll(propertyMap);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- StatusCommand statusCmd = new StatusCommand();
|
|
|
- statusCmd.setClusterName(cl.getClusterName());
|
|
|
- statusCmd.setServiceName(serviceName);
|
|
|
- statusCmd.setComponentName(sch.getServiceComponentName());
|
|
|
- statusCmd.setConfigurations(configurations);
|
|
|
- cmds.add(statusCmd);
|
|
|
+ configurations.put(GLOBAL, props);
|
|
|
+ }
|
|
|
+
|
|
|
+ StatusCommand statusCmd = new StatusCommand();
|
|
|
+ statusCmd.setClusterName(cluster.getClusterName());
|
|
|
+ statusCmd.setServiceName(serviceName);
|
|
|
+ statusCmd.setComponentName(componentName);
|
|
|
+ statusCmd.setConfigurations(configurations);
|
|
|
+
|
|
|
+ // Fill command params
|
|
|
+ Map<String, String> commandParams = statusCmd.getCommandParams();
|
|
|
+ commandParams.put(SCHEMA_VERSION, serviceInfo.getSchemaVersion());
|
|
|
+
|
|
|
+ String commandTimeout = ExecutionCommand.KeyNames.COMMAND_TIMEOUT_DEFAULT;
|
|
|
+ CommandScriptDefinition script = componentInfo.getCommandScript();
|
|
|
+ if (serviceInfo.getSchemaVersion().equals(AmbariMetaInfo.SCHEMA_VERSION_2)) {
|
|
|
+ if (script != null) {
|
|
|
+ commandParams.put(SCRIPT, script.getScript());
|
|
|
+ commandParams.put(SCRIPT_TYPE, script.getScriptType().toString());
|
|
|
+ commandTimeout = String.valueOf(script.getTimeout());
|
|
|
+ } else {
|
|
|
+ String message = String.format("Component %s of service %s has not " +
|
|
|
+ "command script defined", componentName, serviceName);
|
|
|
+ throw new AmbariException(message);
|
|
|
}
|
|
|
}
|
|
|
- return cmds;
|
|
|
+ commandParams.put(COMMAND_TIMEOUT, commandTimeout);
|
|
|
+ commandParams.put(SERVICE_METADATA_FOLDER,
|
|
|
+ serviceInfo.getServiceMetadataFolder());
|
|
|
+ // Fill host level params
|
|
|
+ Map<String, String> hostLevelParams = statusCmd.getHostLevelParams();
|
|
|
+ hostLevelParams.put(STACK_NAME, stackId.getStackName());
|
|
|
+ hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
|
|
|
+
|
|
|
+ return statusCmd;
|
|
|
}
|
|
|
+
|
|
|
}
|