|
@@ -1077,9 +1077,9 @@ public class BlueprintConfigurationProcessor {
|
|
|
} else {
|
|
|
int matchingGroupCount = topology.getHostGroupsForComponent(component).size();
|
|
|
if (matchingGroupCount == 1) {
|
|
|
- Collection<String> componentHosts = topology.getHostAssignmentsForComponent(component);
|
|
|
//todo: warn if > 1 hosts
|
|
|
- return origValue.replace("localhost", componentHosts.iterator().next());
|
|
|
+ return replacePropertyValue(origValue,
|
|
|
+ topology.getHostAssignmentsForComponent(component).iterator().next(), properties);
|
|
|
} else {
|
|
|
//todo: extract all hard coded HA logic
|
|
|
Cardinality cardinality = topology.getBlueprint().getStack().getCardinality(component);
|
|
@@ -1159,6 +1159,10 @@ public class BlueprintConfigurationProcessor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public String replacePropertyValue(String origValue, String host, Map<String, Map<String, String>> properties) {
|
|
|
+ return origValue.replace("localhost", host);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public Collection<String> getRequiredHostGroups(String propertyName,
|
|
|
String origValue,
|
|
@@ -1855,6 +1859,21 @@ public class BlueprintConfigurationProcessor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * A topology independent updater which provides a default implementation of getRequiredHostGroups
|
|
|
+ * since no topology related information is required by the updater.
|
|
|
+ */
|
|
|
+ private static abstract class NonTopologyUpdater implements PropertyUpdater {
|
|
|
+ @Override
|
|
|
+ public Collection<String> getRequiredHostGroups(String propertyName,
|
|
|
+ String origValue,
|
|
|
+ Map<String, Map<String, String>> properties,
|
|
|
+ ClusterTopology topology) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Register updaters for configuration properties.
|
|
|
*/
|
|
@@ -1877,6 +1896,7 @@ public class BlueprintConfigurationProcessor {
|
|
|
Map<String, PropertyUpdater> accumuloSiteMap = new HashMap<String, PropertyUpdater>();
|
|
|
Map<String, PropertyUpdater> falconStartupPropertiesMap = new HashMap<String, PropertyUpdater>();
|
|
|
Map<String, PropertyUpdater> kafkaBrokerMap = new HashMap<String, PropertyUpdater>();
|
|
|
+ Map<String, PropertyUpdater> atlasPropsMap = new HashMap<String, PropertyUpdater>();
|
|
|
Map<String, PropertyUpdater> mapredEnvMap = new HashMap<String, PropertyUpdater>();
|
|
|
Map<String, PropertyUpdater> hadoopEnvMap = new HashMap<String, PropertyUpdater>();
|
|
|
Map<String, PropertyUpdater> hbaseEnvMap = new HashMap<String, PropertyUpdater>();
|
|
@@ -1911,6 +1931,7 @@ public class BlueprintConfigurationProcessor {
|
|
|
singleHostTopologyUpdaters.put("hive-env", hiveEnvMap);
|
|
|
singleHostTopologyUpdaters.put("oozie-env", oozieEnvMap);
|
|
|
singleHostTopologyUpdaters.put("kafka-broker", kafkaBrokerMap);
|
|
|
+ singleHostTopologyUpdaters.put("application-properties", atlasPropsMap);
|
|
|
|
|
|
mPropertyUpdaters.put("hadoop-env", hadoopEnvMap);
|
|
|
mPropertyUpdaters.put("hbase-env", hbaseEnvMap);
|
|
@@ -1980,7 +2001,6 @@ public class BlueprintConfigurationProcessor {
|
|
|
yarnSiteMap.put("yarn.timeline-service.webapp.address", new SingleHostTopologyUpdater("APP_TIMELINE_SERVER"));
|
|
|
yarnSiteMap.put("yarn.timeline-service.webapp.https.address", new SingleHostTopologyUpdater("APP_TIMELINE_SERVER"));
|
|
|
|
|
|
-
|
|
|
// HIVE_SERVER
|
|
|
multiHiveSiteMap.put("hive.metastore.uris", new MultipleHostTopologyUpdater("HIVE_METASTORE", ',', true));
|
|
|
dbHiveSiteMap.put("javax.jdo.option.ConnectionURL",
|
|
@@ -1995,6 +2015,70 @@ public class BlueprintConfigurationProcessor {
|
|
|
multiHiveSiteMap.put("hive.zookeeper.quorum", new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER"));
|
|
|
multiHiveSiteMap.put("hive.cluster.delegation.token.store.zookeeper.connectString", new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER"));
|
|
|
|
|
|
+ // HIVE Atlas integration
|
|
|
+ hiveSiteMap.put("hive.exec.post.hooks", new NonTopologyUpdater() {
|
|
|
+ @Override
|
|
|
+ public String updateForClusterCreate(String propertyName,
|
|
|
+ String origValue,
|
|
|
+ Map<String, Map<String, String>> properties,
|
|
|
+ ClusterTopology topology) {
|
|
|
+
|
|
|
+ if (topology.getBlueprint().getServices().contains("ATLAS")) {
|
|
|
+ String hiveHookClass = "org.apache.atlas.hive.hook.HiveHook";
|
|
|
+ if (origValue == null || origValue.isEmpty()) {
|
|
|
+ return hiveHookClass;
|
|
|
+ } else {
|
|
|
+ return String.format("%s,%s", origValue, hiveHookClass);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return origValue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ //todo: john - this property should be moved to atlas configuration
|
|
|
+ hiveSiteMap.put("atlas.cluster.name", new NonTopologyUpdater() {
|
|
|
+ @Override
|
|
|
+ public String updateForClusterCreate(String propertyName,
|
|
|
+ String origValue,
|
|
|
+ Map<String, Map<String, String>> properties,
|
|
|
+ ClusterTopology topology) {
|
|
|
+
|
|
|
+ if (topology.getBlueprint().getServices().contains("ATLAS")) {
|
|
|
+ // if original value is not set or is the default "primary" set the cluster id
|
|
|
+ if (origValue == null || origValue.trim().isEmpty() || origValue.equals("primary")) {
|
|
|
+ //use cluster id because cluster name may change
|
|
|
+ return String.valueOf(topology.getClusterId());
|
|
|
+ } else {
|
|
|
+ // if explicitly set by user, don't override
|
|
|
+ return origValue;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return origValue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ //todo: john - this property should be removed
|
|
|
+ hiveSiteMap.put("atlas.rest.address", new SingleHostTopologyUpdater("ATLAS_SERVER") {
|
|
|
+ @Override
|
|
|
+ public String replacePropertyValue(String origValue, String host, Map<String, Map<String, String>> properties) {
|
|
|
+ boolean tlsEnabled = Boolean.parseBoolean(properties.get("application-properties").get("atlas.enableTLS"));
|
|
|
+ String scheme;
|
|
|
+ String port;
|
|
|
+ if (tlsEnabled) {
|
|
|
+ scheme = "https";
|
|
|
+ port = properties.get("application-properties").get("atlas.server.https.port");
|
|
|
+ } else {
|
|
|
+ scheme = "http";
|
|
|
+ port = properties.get("application-properties").get("atlas.server.http.port");
|
|
|
+ }
|
|
|
+
|
|
|
+ return String.format("%s://%s:%s", scheme, host, port);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+
|
|
|
// OOZIE_SERVER
|
|
|
oozieSiteMap.put("oozie.base.url", new SingleHostTopologyUpdater("OOZIE_SERVER"));
|
|
|
oozieSiteMap.put("oozie.authentication.kerberos.principal", new SingleHostTopologyUpdater("OOZIE_SERVER"));
|
|
@@ -2041,6 +2125,9 @@ public class BlueprintConfigurationProcessor {
|
|
|
multiOozieSiteMap.put("hadoop.proxyuser.knox.hosts", new MultipleHostTopologyUpdater("KNOX_GATEWAY"));
|
|
|
multiOozieSiteMap.put("oozie.service.ProxyUserService.proxyuser.knox.hosts", new MultipleHostTopologyUpdater("KNOX_GATEWAY"));
|
|
|
|
|
|
+ // ATLAS
|
|
|
+ atlasPropsMap.put("atlas.server.bind.address", new SingleHostTopologyUpdater("ATLAS_SERVER"));
|
|
|
+
|
|
|
|
|
|
// Required due to AMBARI-4933. These no longer seem to be required as the default values in the stack
|
|
|
// are now correct but are left here in case an existing blueprint still contains an old value.
|