|
@@ -18,14 +18,18 @@
|
|
package org.apache.ambari.server.agent;
|
|
package org.apache.ambari.server.agent;
|
|
|
|
|
|
|
|
|
|
-import com.google.common.util.concurrent.AbstractScheduledService;
|
|
|
|
-import com.google.common.util.concurrent.AbstractService;
|
|
|
|
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
-import com.google.gson.Gson;
|
|
|
|
-import com.google.gson.JsonSyntaxException;
|
|
|
|
-import com.google.gson.annotations.SerializedName;
|
|
|
|
-import com.google.inject.Inject;
|
|
|
|
-import com.google.inject.Injector;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.Collection;
|
|
|
|
+import java.util.Iterator;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Set;
|
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
+
|
|
import org.apache.ambari.server.AmbariException;
|
|
import org.apache.ambari.server.AmbariException;
|
|
import org.apache.ambari.server.Role;
|
|
import org.apache.ambari.server.Role;
|
|
import org.apache.ambari.server.RoleCommand;
|
|
import org.apache.ambari.server.RoleCommand;
|
|
@@ -40,7 +44,7 @@ import org.apache.ambari.server.controller.MaintenanceStateHelper;
|
|
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
|
|
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
|
|
import org.apache.ambari.server.events.AlertEvent;
|
|
import org.apache.ambari.server.events.AlertEvent;
|
|
import org.apache.ambari.server.events.AlertReceivedEvent;
|
|
import org.apache.ambari.server.events.AlertReceivedEvent;
|
|
-import org.apache.ambari.server.events.HostComponentVersionEvent;
|
|
|
|
|
|
+import org.apache.ambari.server.events.HostComponentVersionAdvertisedEvent;
|
|
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
|
|
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
|
|
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
|
|
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
|
|
import org.apache.ambari.server.events.publishers.VersionEventPublisher;
|
|
import org.apache.ambari.server.events.publishers.VersionEventPublisher;
|
|
@@ -58,7 +62,6 @@ import org.apache.ambari.server.state.Service;
|
|
import org.apache.ambari.server.state.ServiceComponent;
|
|
import org.apache.ambari.server.state.ServiceComponent;
|
|
import org.apache.ambari.server.state.ServiceComponentHost;
|
|
import org.apache.ambari.server.state.ServiceComponentHost;
|
|
import org.apache.ambari.server.state.StackId;
|
|
import org.apache.ambari.server.state.StackId;
|
|
-import org.apache.ambari.server.state.State;
|
|
|
|
import org.apache.ambari.server.state.UpgradeState;
|
|
import org.apache.ambari.server.state.UpgradeState;
|
|
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
|
|
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
|
|
import org.apache.ambari.server.state.scheduler.RequestExecution;
|
|
import org.apache.ambari.server.state.scheduler.RequestExecution;
|
|
@@ -73,17 +76,13 @@ import org.apache.commons.lang.StringUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Collection;
|
|
|
|
-import java.util.Iterator;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Set;
|
|
|
|
-import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
-import java.util.concurrent.Executors;
|
|
|
|
-import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
-import java.util.concurrent.ThreadFactory;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.google.common.util.concurrent.AbstractService;
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
+import com.google.gson.Gson;
|
|
|
|
+import com.google.gson.JsonSyntaxException;
|
|
|
|
+import com.google.gson.annotations.SerializedName;
|
|
|
|
+import com.google.inject.Inject;
|
|
|
|
+import com.google.inject.Injector;
|
|
|
|
|
|
/**
|
|
/**
|
|
* HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
|
|
* HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
|
|
@@ -479,10 +478,9 @@ public class HeartbeatProcessor extends AbstractService{
|
|
|
|
|
|
String newVersion = structuredOutput == null ? null : structuredOutput.version;
|
|
String newVersion = structuredOutput == null ? null : structuredOutput.version;
|
|
|
|
|
|
- // Pass true to always publish a version event. It is safer to recalculate the version even if we don't
|
|
|
|
- // detect a difference in the value. This is useful in case that a manual database edit is done while
|
|
|
|
- // ambari-server is stopped.
|
|
|
|
- handleComponentVersionReceived(cl, scHost, newVersion, true);
|
|
|
|
|
|
+ HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl, scHost, newVersion);
|
|
|
|
+
|
|
|
|
+ versionEventPublisher.publish(event);
|
|
}
|
|
}
|
|
|
|
|
|
// Updating stack version, if needed (this is not actually for express/rolling upgrades!)
|
|
// Updating stack version, if needed (this is not actually for express/rolling upgrades!)
|
|
@@ -535,7 +533,7 @@ public class HeartbeatProcessor extends AbstractService{
|
|
try {
|
|
try {
|
|
ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
|
|
ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
|
|
|
|
|
|
- if (null != structuredOutput.upgradeDirection && structuredOutput.upgradeDirection.isUpgrade()) {
|
|
|
|
|
|
+ if (null != structuredOutput.upgradeDirection) {
|
|
scHost.setUpgradeState(UpgradeState.FAILED);
|
|
scHost.setUpgradeState(UpgradeState.FAILED);
|
|
}
|
|
}
|
|
} catch (JsonSyntaxException ex) {
|
|
} catch (JsonSyntaxException ex) {
|
|
@@ -648,7 +646,8 @@ public class HeartbeatProcessor extends AbstractService{
|
|
if (extra.containsKey("version")) {
|
|
if (extra.containsKey("version")) {
|
|
String version = extra.get("version").toString();
|
|
String version = extra.get("version").toString();
|
|
|
|
|
|
- handleComponentVersionReceived(cl, scHost, version, false);
|
|
|
|
|
|
+ HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl, scHost, version);
|
|
|
|
+ versionEventPublisher.publish(event);
|
|
}
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -703,42 +702,6 @@ public class HeartbeatProcessor extends AbstractService{
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Updates the version of the given service component, sets the upgrade state (if needed)
|
|
|
|
- * and publishes a version event through the version event publisher.
|
|
|
|
- *
|
|
|
|
- * @param cluster the cluster
|
|
|
|
- * @param scHost service component host
|
|
|
|
- * @param newVersion new version of service component
|
|
|
|
- * @param alwaysPublish if true, always publish a version event; if false,
|
|
|
|
- * only publish if the component version was updated
|
|
|
|
- */
|
|
|
|
- private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
|
|
|
|
- String newVersion, boolean alwaysPublish) {
|
|
|
|
-
|
|
|
|
- boolean updated = false;
|
|
|
|
-
|
|
|
|
- if (StringUtils.isNotBlank(newVersion)) {
|
|
|
|
- final String previousVersion = scHost.getVersion();
|
|
|
|
- if (!StringUtils.equals(previousVersion, newVersion)) {
|
|
|
|
- scHost.setVersion(newVersion);
|
|
|
|
- scHost.setStackVersion(cluster.getDesiredStackVersion());
|
|
|
|
- if (previousVersion != null && !previousVersion.equalsIgnoreCase(
|
|
|
|
- org.apache.ambari.server.state.State.UNKNOWN.toString())) {
|
|
|
|
- scHost.setUpgradeState(UpgradeState.COMPLETE);
|
|
|
|
- }
|
|
|
|
- updated = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (updated || alwaysPublish) {
|
|
|
|
- HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
|
|
|
|
- versionEventPublisher.publish(event);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* This class is used for mapping json of structured output for keytab distribution actions.
|
|
* This class is used for mapping json of structured output for keytab distribution actions.
|
|
*/
|
|
*/
|