|
|
@@ -29,9 +29,10 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.Callable;
|
|
|
-import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.actionmanager.HostRoleCommand;
|
|
|
@@ -73,6 +74,7 @@ import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile;
|
|
|
import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
|
|
|
import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
|
|
|
import org.apache.ambari.server.topology.validators.TopologyValidatorService;
|
|
|
+import org.apache.ambari.server.utils.ManagedThreadPoolExecutor;
|
|
|
import org.apache.ambari.server.utils.RetryHelper;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
@@ -102,9 +104,23 @@ public class TopologyManager {
|
|
|
private static final String CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = "cluster_configure_task_timeout";
|
|
|
|
|
|
private PersistedState persistedState;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Single threaded executor to execute async tasks. At the moment it's only used to execute ConfigureClusterTask.
|
|
|
+ */
|
|
|
private final ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
|
- private final Executor taskExecutor; // executes TopologyTasks
|
|
|
- private final boolean parallelTaskCreationEnabled;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Thread pool size for topology task executors.
|
|
|
+ */
|
|
|
+ private int topologyTaskExecutorThreadPoolSize;
|
|
|
+ /**
|
|
|
+ * There is one ExecutorService for each cluster to execute TopologyTasks.
|
|
|
+ * TopologyTasks are submitted into ExecutorService for each cluster,
|
|
|
+ * however the ExecutorService is started only after cluster configuration is finished.
|
|
|
+ */
|
|
|
+ private final Map<Long, ManagedThreadPoolExecutor> topologyTaskExecutorServiceMap = new HashMap<>();
|
|
|
+
|
|
|
private Collection<String> hostsToIgnore = new HashSet<>();
|
|
|
private final List<HostImpl> availableHosts = new LinkedList<>();
|
|
|
private final Map<String, LogicalRequest> reservedHosts = new HashMap<>();
|
|
|
@@ -158,17 +174,15 @@ public class TopologyManager {
|
|
|
private Map<Long, Boolean> clusterProvisionWithBlueprintCreationFinished = new HashMap<>();
|
|
|
|
|
|
public TopologyManager() {
|
|
|
- parallelTaskCreationEnabled = false;
|
|
|
- taskExecutor = executor;
|
|
|
+ topologyTaskExecutorThreadPoolSize = 1;
|
|
|
}
|
|
|
|
|
|
@Inject
|
|
|
public TopologyManager(Configuration configuration) {
|
|
|
- int threadPoolSize = configuration.getParallelTopologyTaskCreationThreadPoolSize();
|
|
|
- parallelTaskCreationEnabled = configuration.isParallelTopologyTaskCreationEnabled() && threadPoolSize > 1;
|
|
|
- taskExecutor = parallelTaskCreationEnabled
|
|
|
- ? Executors.newFixedThreadPool(threadPoolSize)
|
|
|
- : executor;
|
|
|
+ topologyTaskExecutorThreadPoolSize = configuration.getParallelTopologyTaskCreationThreadPoolSize();
|
|
|
+ if (!configuration.isParallelTopologyTaskCreationEnabled()) {
|
|
|
+ topologyTaskExecutorThreadPoolSize = 1;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// executed by the IoC framework after creating the object (guice)
|
|
|
@@ -310,6 +324,10 @@ public class TopologyManager {
|
|
|
// set provision action requested
|
|
|
topology.setProvisionAction(request.getProvisionAction());
|
|
|
|
|
|
+
|
|
|
+ // create task executor for TopologyTasks
|
|
|
+ getOrCreateTopologyTaskExecutor(clusterId);
|
|
|
+
|
|
|
// persist request
|
|
|
LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
|
|
|
@Override
|
|
|
@@ -325,15 +343,6 @@ public class TopologyManager {
|
|
|
addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true,
|
|
|
stackAdvisorBlueprintProcessor, securityType == SecurityType.KERBEROS));
|
|
|
|
|
|
- // Notify listeners that cluster configuration finished
|
|
|
- executor.submit(new Callable<Boolean>() {
|
|
|
- @Override
|
|
|
- public Boolean call() throws Exception {
|
|
|
- ambariEventPublisher.publish(new ClusterConfigFinishedEvent(clusterName));
|
|
|
- return Boolean.TRUE;
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
// Process the logical request
|
|
|
processRequest(request, topology, logicalRequest);
|
|
|
|
|
|
@@ -345,6 +354,17 @@ public class TopologyManager {
|
|
|
return getRequestStatus(logicalRequest.getRequestId());
|
|
|
}
|
|
|
|
|
|
+ @Subscribe
|
|
|
+ public void onClusterConfigFinishedEvent(ClusterConfigFinishedEvent event) {
|
|
|
+ ManagedThreadPoolExecutor taskExecutor = topologyTaskExecutorServiceMap.get(event.getClusterId());
|
|
|
+ if (taskExecutor == null) {
|
|
|
+ LOG.error("Can't find executor service taskQueue not found for cluster: {} ", event.getClusterName());
|
|
|
+ } else {
|
|
|
+ LOG.info("Starting topology task ExecutorService for cluster: {}", event.getClusterName());
|
|
|
+ taskExecutor.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Saves the quick links profile to the DB as an Ambari setting. Creates a new setting entity or updates the existing
|
|
|
@@ -941,16 +961,8 @@ public class TopologyManager {
|
|
|
}
|
|
|
|
|
|
LOG.info("TopologyManager.processAcceptedHostOffer: queue tasks for host = {} which responded {}", hostName, response.getAnswer());
|
|
|
- if (parallelTaskCreationEnabled) {
|
|
|
- executor.execute(new Runnable() { // do not start until cluster config done
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- queueHostTasks(topology, response, hostName);
|
|
|
- }
|
|
|
- });
|
|
|
- } else {
|
|
|
- queueHostTasks(topology, response, hostName);
|
|
|
- }
|
|
|
+ queueHostTasks(topology, response, hostName);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
@Transactional
|
|
|
@@ -959,9 +971,23 @@ public class TopologyManager {
|
|
|
persistedState.registerInTopologyHostInfo(host);
|
|
|
}
|
|
|
|
|
|
+ private ExecutorService getOrCreateTopologyTaskExecutor(Long clusterId) {
|
|
|
+ ManagedThreadPoolExecutor topologyTaskExecutor = this.topologyTaskExecutorServiceMap.get(clusterId);
|
|
|
+ if (topologyTaskExecutor == null) {
|
|
|
+ LOG.info("Creating TopologyTaskExecutorService for clusterId: {}", clusterId);
|
|
|
+
|
|
|
+ topologyTaskExecutor = new ManagedThreadPoolExecutor(topologyTaskExecutorThreadPoolSize,
|
|
|
+ topologyTaskExecutorThreadPoolSize, 0L, TimeUnit.MILLISECONDS,
|
|
|
+ new LinkedBlockingQueue<Runnable>());
|
|
|
+ topologyTaskExecutorServiceMap.put(clusterId, topologyTaskExecutor);
|
|
|
+ }
|
|
|
+ return topologyTaskExecutor;
|
|
|
+ }
|
|
|
+
|
|
|
private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) {
|
|
|
LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName);
|
|
|
- response.executeTasks(taskExecutor, hostName, topology, ambariContext);
|
|
|
+ ExecutorService executorService = getOrCreateTopologyTaskExecutor(topology.getClusterId());
|
|
|
+ response.executeTasks(executorService, hostName, topology, ambariContext);
|
|
|
}
|
|
|
|
|
|
private void updateHostWithRackInfo(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
|
|
|
@@ -1108,7 +1134,7 @@ public class TopologyManager {
|
|
|
}
|
|
|
|
|
|
ConfigureClusterTask configureClusterTask = configureClusterTaskFactory.createConfigureClusterTask(topology,
|
|
|
- configurationRequest);
|
|
|
+ configurationRequest, ambariEventPublisher);
|
|
|
|
|
|
AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay,
|
|
|
Executors.newScheduledThreadPool(1));
|