|
@@ -44,6 +44,7 @@ import org.apache.ambari.server.controller.AmbariServer;
|
|
|
import org.apache.ambari.server.controller.RequestStatusResponse;
|
|
|
import org.apache.ambari.server.controller.ShortTaskStatus;
|
|
|
import org.apache.ambari.server.controller.internal.ArtifactResourceProvider;
|
|
|
+import org.apache.ambari.server.controller.internal.BaseClusterRequest;
|
|
|
import org.apache.ambari.server.controller.internal.CalculatedStatus;
|
|
|
import org.apache.ambari.server.controller.internal.CredentialResourceProvider;
|
|
|
import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
|
|
@@ -77,6 +78,7 @@ import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.google.common.eventbus.Subscribe;
|
|
|
import com.google.inject.Singleton;
|
|
|
+import com.google.inject.persist.Transactional;
|
|
|
|
|
|
/**
|
|
|
* Manages all cluster provisioning actions on the cluster topology.
|
|
@@ -246,20 +248,18 @@ public class TopologyManager {
|
|
|
public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException {
|
|
|
ensureInitialized();
|
|
|
|
|
|
- if (null != request.getQuickLinksProfileJson()) {
|
|
|
- saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson());
|
|
|
- }
|
|
|
-
|
|
|
- ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
|
|
|
+ final ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
|
|
|
final String clusterName = request.getClusterName();
|
|
|
+ final Stack stack = topology.getBlueprint().getStack();
|
|
|
final String repoVersion = request.getRepositoryVersion();
|
|
|
|
|
|
// get the id prior to creating ambari resources which increments the counter
|
|
|
- Long provisionId = ambariContext.getNextRequestId();
|
|
|
+ final Long provisionId = ambariContext.getNextRequestId();
|
|
|
|
|
|
- final Stack stack = topology.getBlueprint().getStack();
|
|
|
boolean configureSecurity = false;
|
|
|
+
|
|
|
SecurityConfiguration securityConfiguration = processSecurityConfiguration(request);
|
|
|
+
|
|
|
if (securityConfiguration != null && securityConfiguration.getType() == SecurityType.KERBEROS) {
|
|
|
configureSecurity = true;
|
|
|
addKerberosClient(topology);
|
|
@@ -291,18 +291,25 @@ public class TopologyManager {
|
|
|
topology.setConfigRecommendationStrategy(request.getConfigRecommendationStrategy());
|
|
|
// set provision action requested
|
|
|
topology.setProvisionAction(request.getProvisionAction());
|
|
|
- // persist request after it has successfully validated
|
|
|
- PersistedTopologyRequest persistedRequest = RetryHelper.executeWithRetry(new Callable<PersistedTopologyRequest>() {
|
|
|
- @Override
|
|
|
- public PersistedTopologyRequest call() throws Exception {
|
|
|
- return persistedState.persistTopologyRequest(request);
|
|
|
+
|
|
|
+ // persist request
|
|
|
+ LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
|
|
|
+ @Override
|
|
|
+ public LogicalRequest call() throws Exception {
|
|
|
+ LogicalRequest logicalRequest = processAndPersistProvisionClusterTopologyRequest(request, topology, provisionId);
|
|
|
+ return logicalRequest;
|
|
|
+ }
|
|
|
}
|
|
|
- });
|
|
|
+ );
|
|
|
+
|
|
|
|
|
|
clusterTopologyMap.put(clusterId, topology);
|
|
|
|
|
|
addClusterConfigRequest(topology, new ClusterConfigurationRequest(
|
|
|
ambariContext, topology, true, stackAdvisorBlueprintProcessor, configureSecurity));
|
|
|
+
|
|
|
+
|
|
|
+ // Notify listeners that cluster configuration finished
|
|
|
executor.submit(new Callable<Boolean>() {
|
|
|
@Override
|
|
|
public Boolean call() throws Exception {
|
|
@@ -310,7 +317,9 @@ public class TopologyManager {
|
|
|
return Boolean.TRUE;
|
|
|
}
|
|
|
});
|
|
|
- LogicalRequest logicalRequest = processRequest(persistedRequest, topology, provisionId);
|
|
|
+
|
|
|
+ // Process the logical request
|
|
|
+ processRequest(request, topology, logicalRequest);
|
|
|
|
|
|
//todo: this should be invoked as part of a generic lifecycle event which could possibly
|
|
|
//todo: be tied to cluster state
|
|
@@ -320,6 +329,7 @@ public class TopologyManager {
|
|
|
return getRequestStatus(logicalRequest.getRequestId());
|
|
|
}
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* Saves the quick links profile to the DB as an Ambari setting. Creates a new setting entity or updates the existing
|
|
|
* one.
|
|
@@ -435,14 +445,14 @@ public class TopologyManager {
|
|
|
|
|
|
}
|
|
|
|
|
|
- public RequestStatusResponse scaleHosts(ScaleClusterRequest request)
|
|
|
+ public RequestStatusResponse scaleHosts(final ScaleClusterRequest request)
|
|
|
throws InvalidTopologyException, AmbariException {
|
|
|
|
|
|
ensureInitialized();
|
|
|
LOG.info("TopologyManager.scaleHosts: Entering");
|
|
|
String clusterName = request.getClusterName();
|
|
|
long clusterId = ambariContext.getClusterId(clusterName);
|
|
|
- ClusterTopology topology = clusterTopologyMap.get(clusterId);
|
|
|
+ final ClusterTopology topology = clusterTopologyMap.get(clusterId);
|
|
|
if (topology == null) {
|
|
|
throw new InvalidTopologyException("Unable to retrieve cluster topology for cluster. This is most likely a " +
|
|
|
"result of trying to scale a cluster via the API which was created using " +
|
|
@@ -453,11 +463,64 @@ public class TopologyManager {
|
|
|
|
|
|
hostNameCheck(request, topology);
|
|
|
request.setClusterId(clusterId);
|
|
|
- PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
|
|
|
+
|
|
|
// this registers/updates all request host groups
|
|
|
topology.update(request);
|
|
|
- return getRequestStatus(processRequest(persistedRequest, topology,
|
|
|
- ambariContext.getNextRequestId()).getRequestId());
|
|
|
+
|
|
|
+ final Long requestId = ambariContext.getNextRequestId();
|
|
|
+ LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
|
|
|
+ @Override
|
|
|
+ public LogicalRequest call() throws Exception {
|
|
|
+ LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, requestId);
|
|
|
+
|
|
|
+ return logicalRequest;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ );
|
|
|
+
|
|
|
+ processRequest(request, topology, logicalRequest);
|
|
|
+
|
|
|
+ return getRequestStatus(logicalRequest.getRequestId());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided
|
|
|
+ * provision cluster request and topology.
|
|
|
+ * @param request Provision cluster request to create a logical request for.
|
|
|
+ * @param topology Cluster topology
|
|
|
+ * @param logicalRequestId The Id for the created logical request
|
|
|
+ * @return Logical request created.
|
|
|
+ */
|
|
|
+ @Transactional
|
|
|
+ protected LogicalRequest processAndPersistProvisionClusterTopologyRequest(ProvisionClusterRequest request, ClusterTopology topology, Long logicalRequestId)
|
|
|
+ throws InvalidTopologyException, AmbariException {
|
|
|
+
|
|
|
+ if (null != request.getQuickLinksProfileJson()) {
|
|
|
+ saveOrUpdateQuickLinksProfile(request.getQuickLinksProfileJson());
|
|
|
+ }
|
|
|
+
|
|
|
+ LogicalRequest logicalRequest = processAndPersistTopologyRequest(request, topology, logicalRequestId);
|
|
|
+
|
|
|
+ return logicalRequest;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates and persists a {@see PersistedTopologyRequest} and a {@see LogicalRequest} for the provided request and topology.
|
|
|
+ * @param request {@see ProvisionClusterRequest} or {@see ScaleClusterRequest} to create a logical request for.
|
|
|
+ * @param topology Cluster topology
|
|
|
+ * @param logicalRequestId The Id for the created logical request
|
|
|
+ * @return Logical request created.
|
|
|
+ */
|
|
|
+ @Transactional
|
|
|
+ protected LogicalRequest processAndPersistTopologyRequest(BaseClusterRequest request, ClusterTopology topology, Long logicalRequestId)
|
|
|
+ throws InvalidTopologyException, AmbariException {
|
|
|
+ PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request);
|
|
|
+
|
|
|
+ LogicalRequest logicalRequest = createLogicalRequest(persistedRequest, topology, logicalRequestId);
|
|
|
+
|
|
|
+ return logicalRequest;
|
|
|
}
|
|
|
|
|
|
private void hostNameCheck(ScaleClusterRequest request, ClusterTopology topology) throws InvalidTopologyException {
|
|
@@ -680,13 +743,12 @@ public class TopologyManager {
|
|
|
return hostComponentMap;
|
|
|
}
|
|
|
|
|
|
- private LogicalRequest processRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
|
|
|
+ private void processRequest(TopologyRequest request, ClusterTopology topology, final LogicalRequest logicalRequest)
|
|
|
throws AmbariException {
|
|
|
|
|
|
LOG.info("TopologyManager.processRequest: Entering");
|
|
|
|
|
|
- finalizeTopology(request.getRequest(), topology);
|
|
|
- LogicalRequest logicalRequest = createLogicalRequest(request, topology, requestId);
|
|
|
+ finalizeTopology(request, topology);
|
|
|
|
|
|
boolean requestHostComplete = false;
|
|
|
//todo: overall synchronization. Currently we have nested synchronization here
|
|
@@ -750,22 +812,16 @@ public class TopologyManager {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return logicalRequest;
|
|
|
}
|
|
|
|
|
|
- private LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
|
|
|
+ @Transactional
|
|
|
+ protected LogicalRequest createLogicalRequest(final PersistedTopologyRequest request, ClusterTopology topology, Long requestId)
|
|
|
throws AmbariException {
|
|
|
|
|
|
final LogicalRequest logicalRequest = logicalRequestFactory.createRequest(
|
|
|
requestId, request.getRequest(), topology);
|
|
|
|
|
|
- RetryHelper.executeWithRetry(new Callable<Object>() {
|
|
|
- @Override
|
|
|
- public Object call() throws Exception {
|
|
|
- persistedState.persistLogicalRequest(logicalRequest, request.getId());
|
|
|
- return null;
|
|
|
- }
|
|
|
- });
|
|
|
+ persistedState.persistLogicalRequest(logicalRequest, request.getId());
|
|
|
|
|
|
allRequests.put(logicalRequest.getRequestId(), logicalRequest);
|
|
|
LOG.info("TopologyManager.createLogicalRequest: created LogicalRequest with ID = {} and completed persistence of this request.",
|
|
@@ -796,11 +852,10 @@ public class TopologyManager {
|
|
|
|
|
|
// persist the host request -> hostName association
|
|
|
try {
|
|
|
- RetryHelper.executeWithRetry(new Callable<Object>() {
|
|
|
+ RetryHelper.executeWithRetry(new Callable<Void>() {
|
|
|
@Override
|
|
|
- public Object call() throws Exception {
|
|
|
- persistedState.registerHostName(response.getHostRequestId(), hostName);
|
|
|
- persistedState.registerInTopologyHostInfo(host);
|
|
|
+ public Void call() throws Exception {
|
|
|
+ persistTopologyHostRegistration(response.getHostRequestId(), host);
|
|
|
return null;
|
|
|
}
|
|
|
});
|
|
@@ -822,6 +877,12 @@ public class TopologyManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Transactional
|
|
|
+ protected void persistTopologyHostRegistration(long hostRequestId, final HostImpl host) {
|
|
|
+ persistedState.registerHostName(hostRequestId, host.getHostName());
|
|
|
+ persistedState.registerInTopologyHostInfo(host);
|
|
|
+ }
|
|
|
+
|
|
|
private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) {
|
|
|
LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName);
|
|
|
response.executeTasks(taskExecutor, hostName, topology, ambariContext);
|