|
@@ -53,7 +53,6 @@ public class TopologyManager {
|
|
|
public static final String TOPOLOGY_RESOLVED_TAG = "TOPOLOGY_RESOLVED";
|
|
|
|
|
|
private PersistedState persistedState;
|
|
|
- //private ExecutorService executor = getExecutorService();
|
|
|
private ExecutorService executor = Executors.newSingleThreadExecutor();
|
|
|
private Collection<String> hostsToIgnore = new HashSet<String>();
|
|
|
private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
|
|
@@ -83,8 +82,8 @@ public class TopologyManager {
|
|
|
private void ensureInitialized() {
|
|
|
synchronized(initializationLock) {
|
|
|
if (! isInitialized) {
|
|
|
- isInitialized = true;
|
|
|
replayRequests(persistedState.getAllRequests());
|
|
|
+ isInitialized = true;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -99,7 +98,7 @@ public class TopologyManager {
|
|
|
String clusterName = topology.getClusterName();
|
|
|
clusterTopologyMap.put(clusterName, topology);
|
|
|
|
|
|
- addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, true));
|
|
|
+ addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true));
|
|
|
LogicalRequest logicalRequest = processRequest(persistedRequest, topology);
|
|
|
|
|
|
//todo: this should be invoked as part of a generic lifecycle event which could possibly
|
|
@@ -394,7 +393,7 @@ public class TopologyManager {
|
|
|
if (! configChecked) {
|
|
|
configChecked = true;
|
|
|
if (! ambariContext.doesConfigurationWithTagExist(topology.getClusterName(), TOPOLOGY_RESOLVED_TAG)) {
|
|
|
- addClusterConfigRequest(new ClusterConfigurationRequest(ambariContext, topology, false));
|
|
|
+ addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, false));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -423,28 +422,26 @@ public class TopologyManager {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Making it a synchronous call as resolution of initial configurations need to happen before
|
|
|
- * any tasks are created
|
|
|
- * TODO - needs further review
|
|
|
+ * Register the configuration task which is responsible for configuration topology resolution
|
|
|
+ * and setting the updated configuration on the cluster. This task needs to be submitted to the
|
|
|
+ * executor before any host requests to ensure that no install or start tasks are executed prior
|
|
|
+ * to configuration being set on the cluster.
|
|
|
+ *
|
|
|
+ * @param topology cluster topology
|
|
|
+ * @param configurationRequest configuration request to be executed
|
|
|
*/
|
|
|
- private void addClusterConfigRequest(ClusterConfigurationRequest configurationRequest) {
|
|
|
- ConfigureClusterTask cct = new ConfigureClusterTask(configurationRequest);
|
|
|
- cct.run();
|
|
|
- //executor.execute(new ConfigureClusterTask(configurationRequest));
|
|
|
- //try {
|
|
|
- // executor.awaitTermination(10, TimeUnit.SECONDS);
|
|
|
- // LOG.info("Resolved cluster topology configuration.");
|
|
|
- //}catch(InterruptedException ex) {
|
|
|
- // LOG.warn("Failed to resolve topology configuration");
|
|
|
- //}
|
|
|
+ private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) {
|
|
|
+ executor.execute(new ConfigureClusterTask(topology, configurationRequest));
|
|
|
}
|
|
|
|
|
|
private class ConfigureClusterTask implements Runnable {
|
|
|
private ClusterConfigurationRequest configRequest;
|
|
|
+ private ClusterTopology topology;
|
|
|
|
|
|
|
|
|
- public ConfigureClusterTask(ClusterConfigurationRequest configRequest) {
|
|
|
+ public ConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest configRequest) {
|
|
|
this.configRequest = configRequest;
|
|
|
+ this.topology = topology;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -457,14 +454,13 @@ public class TopologyManager {
|
|
|
Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();
|
|
|
while (! completed && ! interrupted) {
|
|
|
try {
|
|
|
- Thread.sleep(200);
|
|
|
+ Thread.sleep(100);
|
|
|
} catch (InterruptedException e) {
|
|
|
interrupted = true;
|
|
|
// reset interrupted flag on thread
|
|
|
Thread.interrupted();
|
|
|
}
|
|
|
-
|
|
|
- completed = areConfigsResolved(requiredHostGroups);
|
|
|
+ completed = areRequiredHostGroupsResolved(requiredHostGroups);
|
|
|
}
|
|
|
|
|
|
if (! interrupted) {
|
|
@@ -478,12 +474,15 @@ public class TopologyManager {
|
|
|
"An exception occurred while attempting to process cluster configs and set on cluster: " + e);
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
-
|
|
|
- //executePendingTasks();
|
|
|
}
|
|
|
LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return the set of host group names which are required for configuration topology resolution.
|
|
|
+ *
|
|
|
+ * @return set of required host group names
|
|
|
+ */
|
|
|
private Collection<String> getTopologyRequiredHostGroups() {
|
|
|
Collection<String> requiredHostGroups;
|
|
|
try {
|
|
@@ -497,16 +496,20 @@ public class TopologyManager {
|
|
|
return requiredHostGroups;
|
|
|
}
|
|
|
|
|
|
- // get set of required host groups from config processor and confirm that all requests
|
|
|
- // have fully resolved the host names for the required host groups
|
|
|
- private boolean areConfigsResolved(Collection<String> requiredHostGroups) {
|
|
|
+ /**
|
|
|
+ * Determine if all hosts for the given set of required host groups are known.
|
|
|
+ *
|
|
|
+ * @param requiredHostGroups set of required host groups
|
|
|
+ * @return true if all required host groups are resolved
|
|
|
+ */
|
|
|
+ private boolean areRequiredHostGroupsResolved(Collection<String> requiredHostGroups) {
|
|
|
boolean configTopologyResolved = true;
|
|
|
- synchronized (outstandingRequests) {
|
|
|
- for (LogicalRequest outstandingRequest : outstandingRequests) {
|
|
|
- if (! outstandingRequest.areGroupsResolved(requiredHostGroups)) {
|
|
|
- configTopologyResolved = false;
|
|
|
- break;
|
|
|
- }
|
|
|
+ Map<String, HostGroupInfo> hostGroupInfo = topology.getHostGroupInfo();
|
|
|
+ for (String hostGroup : requiredHostGroups) {
|
|
|
+ HostGroupInfo groupInfo = hostGroupInfo.get(hostGroup);
|
|
|
+ if (groupInfo == null || groupInfo.getHostNames().size() < groupInfo.getRequestedHostCount()) {
|
|
|
+ configTopologyResolved = false;
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
return configTopologyResolved;
|