|
@@ -1,4 +1,4 @@
|
|
|
-/**
|
|
|
+/*
|
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
* or more contributor license agreements. See the NOTICE file
|
|
|
* distributed with this work for additional information
|
|
@@ -29,6 +29,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.Executor;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
@@ -38,6 +39,7 @@ import org.apache.ambari.server.AmbariException;
|
|
|
import org.apache.ambari.server.actionmanager.HostRoleCommand;
|
|
|
import org.apache.ambari.server.actionmanager.HostRoleStatus;
|
|
|
import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor;
|
|
|
+import org.apache.ambari.server.configuration.Configuration;
|
|
|
import org.apache.ambari.server.controller.AmbariServer;
|
|
|
import org.apache.ambari.server.controller.RequestStatusResponse;
|
|
|
import org.apache.ambari.server.controller.ShortTaskStatus;
|
|
@@ -86,7 +88,9 @@ public class TopologyManager {
|
|
|
private static final String CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = "cluster_configure_task_timeout";
|
|
|
|
|
|
private PersistedState persistedState;
|
|
|
- private ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
|
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
|
+ private final Executor taskExecutor; // executes TopologyTasks
|
|
|
+ private final boolean parallelTaskCreationEnabled;
|
|
|
private Collection<String> hostsToIgnore = new HashSet<String>();
|
|
|
private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
|
|
|
private final Map<String, LogicalRequest> reservedHosts = new HashMap<String, LogicalRequest>();
|
|
@@ -127,8 +131,18 @@ public class TopologyManager {
|
|
|
*/
|
|
|
private Map<Long, Boolean> clusterProvisionWithBlueprintCreationFinished = new HashMap<>();
|
|
|
|
|
|
- public TopologyManager(){
|
|
|
+ public TopologyManager() {
|
|
|
+ parallelTaskCreationEnabled = false;
|
|
|
+ taskExecutor = executor;
|
|
|
+ }
|
|
|
|
|
|
+ @Inject
|
|
|
+ public TopologyManager(Configuration configuration) {
|
|
|
+ int threadPoolSize = configuration.getParallelTopologyTaskCreationThreadPoolSize();
|
|
|
+ parallelTaskCreationEnabled = configuration.isParallelTopologyTaskCreationEnabled() && threadPoolSize > 1;
|
|
|
+ taskExecutor = parallelTaskCreationEnabled
|
|
|
+ ? Executors.newFixedThreadPool(threadPoolSize)
|
|
|
+ : executor;
|
|
|
}
|
|
|
|
|
|
@Inject
|
|
@@ -691,7 +705,7 @@ public class TopologyManager {
|
|
|
return logicalRequest;
|
|
|
}
|
|
|
|
|
|
- private void processAcceptedHostOffer(ClusterTopology topology, final HostOfferResponse response, final HostImpl host) {
|
|
|
+ private void processAcceptedHostOffer(final ClusterTopology topology, final HostOfferResponse response, final HostImpl host) {
|
|
|
final String hostName = host.getHostName();
|
|
|
try {
|
|
|
topology.addHostToTopology(response.getHostGroupName(), hostName);
|
|
@@ -722,19 +736,24 @@ public class TopologyManager {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- LOG.info("TopologyManager.processAcceptedHostOffer: about to execute tasks for host = {}",
|
|
|
- hostName);
|
|
|
-
|
|
|
- for (TopologyTask task : response.getTasks()) {
|
|
|
- LOG.info("Processing accepted host offer for {} which responded {} and task {}",
|
|
|
- hostName, response.getAnswer(), task.getType());
|
|
|
-
|
|
|
- task.init(topology, ambariContext);
|
|
|
- executor.execute(task);
|
|
|
+ LOG.info("TopologyManager.processAcceptedHostOffer: queue tasks for host = {} which responded {}", hostName, response.getAnswer());
|
|
|
+ if (parallelTaskCreationEnabled) {
|
|
|
+ executor.execute(new Runnable() { // do not start until cluster config done
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ queueHostTasks(topology, response, hostName);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ queueHostTasks(topology, response, hostName);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) {
|
|
|
+ LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName);
|
|
|
+ response.executeTasks(taskExecutor, hostName, topology, ambariContext);
|
|
|
+ }
|
|
|
+
|
|
|
private void updateHostWithRackInfo(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
|
|
|
// the rack info from the cluster creation template
|
|
|
String rackInfoFromTemplate = topology.getHostGroupInfo().get(response.getHostGroupName()).getHostRackInfo().get
|
|
@@ -878,7 +897,7 @@ public class TopologyManager {
|
|
|
}
|
|
|
|
|
|
ConfigureClusterTask configureClusterTask = new ConfigureClusterTask(topology, configurationRequest);
|
|
|
- AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService(configureClusterTask, timeout, delay,
|
|
|
+ AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay,
|
|
|
Executors.newScheduledThreadPool(1));
|
|
|
|
|
|
executor.submit(asyncCallableService);
|