|
@@ -31,7 +31,6 @@ import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
-import com.google.common.reflect.TypeToken;
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.HostNotFoundException;
|
|
|
import org.apache.ambari.server.Role;
|
|
@@ -51,6 +50,7 @@ import org.apache.ambari.server.events.AlertReceivedEvent;
|
|
|
import org.apache.ambari.server.events.HostComponentVersionEvent;
|
|
|
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
|
|
|
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
|
|
|
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
|
|
|
import org.apache.ambari.server.metadata.ActionMetadata;
|
|
|
import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
|
|
|
import org.apache.ambari.server.serveraction.kerberos.KerberosActionDataFile;
|
|
@@ -153,6 +153,9 @@ public class HeartBeatHandler {
|
|
|
@Inject
|
|
|
private AmbariEventPublisher ambariEventPublisher;
|
|
|
|
|
|
+ @Inject
|
|
|
+ private VersionEventPublisher versionEventPublisher;
|
|
|
+
|
|
|
/**
|
|
|
* KerberosPrincipalHostDAO used to set and get Kerberos principal details
|
|
|
*/
|
|
@@ -523,14 +526,13 @@ public class HeartBeatHandler {
|
|
|
//Json structure for component version was incorrect
|
|
|
//do nothing, pass this data further for processing
|
|
|
}
|
|
|
- if (structuredOutput != null && StringUtils.isNotBlank(structuredOutput.getVersion())) {
|
|
|
- handleComponentVersionReceived(scHost, structuredOutput.getVersion());
|
|
|
- }
|
|
|
- // Safer to recalculate the version even if we don't detect a difference in the value.
|
|
|
- // This is useful in case that a manual database edit is done while ambari-server is stopped.
|
|
|
- // TODO should be included into handleComponentVersionReceived() after RU becomes stable
|
|
|
- HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost);
|
|
|
- ambariEventPublisher.publish(event);
|
|
|
+
|
|
|
+ String newVersion = structuredOutput == null ? null : structuredOutput.getVersion();
|
|
|
+
|
|
|
+ // Pass true to always publish a version event. It is safer to recalculate the version even if we don't
|
|
|
+ // detect a difference in the value. This is useful in case that a manual database edit is done while
|
|
|
+ // ambari-server is stopped.
|
|
|
+ handleComponentVersionReceived(cl, scHost, newVersion, true);
|
|
|
}
|
|
|
|
|
|
// Updating stack version, if needed
|
|
@@ -676,12 +678,9 @@ public class HeartBeatHandler {
|
|
|
scHost.setProcesses(list);
|
|
|
}
|
|
|
if (extra.containsKey("version")) {
|
|
|
- boolean versionWasUpdated = handleComponentVersionReceived(scHost, extra.get("version").toString());
|
|
|
- if (versionWasUpdated) {
|
|
|
- // TODO should be included into handleComponentVersionReceived() after RU becomes stable
|
|
|
- HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost);
|
|
|
- ambariEventPublisher.publish(event);
|
|
|
- }
|
|
|
+ String version = extra.get("version").toString();
|
|
|
+
|
|
|
+ handleComponentVersionReceived(cl, scHost, version, false);
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -734,27 +733,39 @@ public class HeartBeatHandler {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Updates version of service component and sets upgrade state if needed.
|
|
|
+ * Updates the version of the given service component, sets the upgrade state (if needed)
|
|
|
+ * and publishes a version event through the version event publisher.
|
|
|
*
|
|
|
- * @param scHost service component host
|
|
|
- * @param newVersion new version of service component
|
|
|
- *
|
|
|
- * @return true if component version was updated to new one
|
|
|
+ * @param cluster the cluster
|
|
|
+ * @param scHost service component host
|
|
|
+ * @param newVersion new version of service component
|
|
|
+ * @param alwaysPublish if true, always publish a version event; if false,
|
|
|
+ * only publish if the component version was updated
|
|
|
*/
|
|
|
- private boolean handleComponentVersionReceived(ServiceComponentHost scHost, String newVersion) {
|
|
|
- final String previousVersion = scHost.getVersion();
|
|
|
- if (!StringUtils.equals(previousVersion, newVersion)) {
|
|
|
- scHost.setVersion(newVersion);
|
|
|
- if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
|
|
|
- scHost.setUpgradeState(UpgradeState.COMPLETE);
|
|
|
+ private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
|
|
|
+ String newVersion, boolean alwaysPublish) {
|
|
|
+
|
|
|
+ boolean updated = false;
|
|
|
+
|
|
|
+ if (StringUtils.isNotBlank(newVersion)) {
|
|
|
+ final String previousVersion = scHost.getVersion();
|
|
|
+ if (!StringUtils.equals(previousVersion, newVersion)) {
|
|
|
+ scHost.setVersion(newVersion);
|
|
|
+ if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
|
|
|
+ scHost.setUpgradeState(UpgradeState.COMPLETE);
|
|
|
+ }
|
|
|
+ updated = true;
|
|
|
}
|
|
|
- return true;
|
|
|
}
|
|
|
- return false;
|
|
|
+
|
|
|
+ if (updated || alwaysPublish) {
|
|
|
+ HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
|
|
|
+ versionEventPublisher.publish(event);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Adds commands from action queue to a heartbeat responce
|
|
|
+ * Adds commands from action queue to a heartbeat response.
|
|
|
*/
|
|
|
protected void sendCommands(String hostname, HeartBeatResponse response)
|
|
|
throws AmbariException {
|