|
|
@@ -28,7 +28,6 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
-import org.apache.ambari.server.ClusterNotFoundException;
|
|
|
import org.apache.ambari.server.EagerSingleton;
|
|
|
import org.apache.ambari.server.Role;
|
|
|
import org.apache.ambari.server.actionmanager.HostRoleCommand;
|
|
|
@@ -123,7 +122,7 @@ public class TaskStatusListener {
|
|
|
* @param event Consumes {@link TaskUpdateEvent}.
|
|
|
*/
|
|
|
@Subscribe
|
|
|
- public void onTaskUpdateEvent(TaskUpdateEvent event) throws ClusterNotFoundException {
|
|
|
+ public void onTaskUpdateEvent(TaskUpdateEvent event) {
|
|
|
LOG.debug("Received task update event {}", event);
|
|
|
List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
|
|
|
List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new ArrayList<>();
|
|
|
@@ -145,13 +144,27 @@ public class TaskStatusListener {
|
|
|
requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
|
|
|
|
|
|
if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) {
|
|
|
- Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>();
|
|
|
- hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
|
|
|
- hostRoleCommand.getRequestId(),
|
|
|
- hostRoleCommand.getStatus(),
|
|
|
- hostRoleCommand.getHostName()));
|
|
|
- requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(),
|
|
|
- activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands));
|
|
|
+ // Ignore requests not related to any cluster. "requests" topic is used for cluster requests only.
|
|
|
+ Long clusterId = activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId();
|
|
|
+ if (clusterId != null && clusterId != -1) {
|
|
|
+ Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>();
|
|
|
+ hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
|
|
|
+ hostRoleCommand.getRequestId(),
|
|
|
+ hostRoleCommand.getStatus(),
|
|
|
+ hostRoleCommand.getHostName()));
|
|
|
+ requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(),
|
|
|
+ activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands));
|
|
|
+ } else {
|
|
|
+ LOG.debug("No STOMP request update event was fired for host component status change due no cluster related, " +
|
|
|
+ "request id: {}, role: {}, role command: {}, host: {}, task id: {}, old state: {}, new state: {}",
|
|
|
+ hostRoleCommand.getRequestId(),
|
|
|
+ hostRoleCommand.getRole(),
|
|
|
+ hostRoleCommand.getRoleCommand(),
|
|
|
+ hostRoleCommand.getHostName(),
|
|
|
+ hostRoleCommand.getTaskId(),
|
|
|
+ activeTasksMap.get(reportedTaskId).getStatus(),
|
|
|
+ hostRoleCommand.getStatus());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -264,7 +277,8 @@ public class TaskStatusListener {
|
|
|
// Request entity of the hostrolecommand should be persisted before publishing task create event
|
|
|
assert requestEntity != null;
|
|
|
Set<StageEntityPK> stageEntityPKs = Sets.newHashSet(stageEntityPK);
|
|
|
- ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs);
|
|
|
+ ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(),
|
|
|
+ stageEntityPKs, requestEntity.getClusterId());
|
|
|
activeRequestMap.put(requestId, request);
|
|
|
}
|
|
|
}
|
|
|
@@ -524,11 +538,14 @@ public class TaskStatusListener {
|
|
|
private HostRoleStatus status;
|
|
|
private HostRoleStatus displayStatus;
|
|
|
private Set <StageEntityPK> stageEntityPks;
|
|
|
+ private Long clusterId;
|
|
|
|
|
|
- public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) {
|
|
|
+ public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks,
|
|
|
+ Long clusterId) {
|
|
|
this.status = status;
|
|
|
this.displayStatus = displayStatus;
|
|
|
this.stageEntityPks = stageEntityPks;
|
|
|
+ this.clusterId = clusterId;
|
|
|
}
|
|
|
|
|
|
public HostRoleStatus getStatus() {
|
|
|
@@ -559,6 +576,13 @@ public class TaskStatusListener {
|
|
|
stageEntityPks.add(stageEntityPK);
|
|
|
}
|
|
|
|
|
|
+ public Long getClusterId() {
|
|
|
+ return clusterId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setClusterId(Long clusterId) {
|
|
|
+ this.clusterId = clusterId;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|