Browse Source

AMBARI-11087. Add Blueprint Support for Storm Nimbus High Availability. (rnettleton)

Bob Nettleton 10 years ago
parent
commit
c8e7c98fb3

+ 45 - 2
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessor.java

@@ -1312,6 +1312,11 @@ public class BlueprintConfigurationProcessor {
                                          Map<String, Map<String, String>> properties,
                                          Map<String, Map<String, String>> properties,
                                          ClusterTopology topology) {
                                          ClusterTopology topology) {
 
 
+      // return customer-supplied properties without updating them
+      if (isFQDNValue(origValue)) {
+        return origValue;
+      }
+
       return doFormat(propertyUpdater.updateForClusterCreate(propertyName, origValue, properties, topology));
       return doFormat(propertyUpdater.updateForClusterCreate(propertyName, origValue, properties, topology));
     }
     }
 
 
@@ -1330,6 +1335,19 @@ public class BlueprintConfigurationProcessor {
 
 
       return propertyUpdater.getRequiredHostGroups(origValue, properties, topology);
       return propertyUpdater.getRequiredHostGroups(origValue, properties, topology);
     }
     }
+
+    /**
+     * Convenience method to determine if a property value is a
+     * customer-specified FQDN.
+     *
+     * @param value property value to examine
+     * @return true if the property represents an FQDN value
+     *         false if the property does not represent an FQDN value
+     */
+    public boolean isFQDNValue(String value) {
+      return !value.contains("%HOSTGROUP") &&
+            !value.contains("localhost");
+    }
   }
   }
 
 
   /**
   /**
@@ -1337,8 +1355,22 @@ public class BlueprintConfigurationProcessor {
    */
    */
   private static class YamlMultiValuePropertyDecorator extends AbstractPropertyValueDecorator {
   private static class YamlMultiValuePropertyDecorator extends AbstractPropertyValueDecorator {
 
 
+    // currently, only plain and single-quoted Yaml flows are supported by this updater
+    enum FlowStyle {
+      SINGLE_QUOTED,
+      PLAIN
+    }
+
+    private final FlowStyle flowStyle;
+
     public YamlMultiValuePropertyDecorator(PropertyUpdater propertyUpdater) {
     public YamlMultiValuePropertyDecorator(PropertyUpdater propertyUpdater) {
+      // single-quote style is considered default by this updater
+      this(propertyUpdater, FlowStyle.SINGLE_QUOTED);
+    }
+
+    protected YamlMultiValuePropertyDecorator(PropertyUpdater propertyUpdater, FlowStyle flowStyle) {
       super(propertyUpdater);
       super(propertyUpdater);
+      this.flowStyle = flowStyle;
     }
     }
 
 
     /**
     /**
@@ -1360,9 +1392,17 @@ public class BlueprintConfigurationProcessor {
           } else {
           } else {
             isFirst = false;
             isFirst = false;
           }
           }
-          sb.append("'");
+
+          if (flowStyle == FlowStyle.SINGLE_QUOTED) {
+            sb.append("'");
+          }
+
           sb.append(value);
           sb.append(value);
-          sb.append("'");
+
+          if (flowStyle == FlowStyle.SINGLE_QUOTED) {
+            sb.append("'");
+          }
+
         }
         }
         sb.append("]");
         sb.append("]");
       }
       }
@@ -1649,6 +1689,9 @@ public class BlueprintConfigurationProcessor {
     stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
     stormSiteMap.put("nimbus.childopts", new OptionalSingleHostTopologyUpdater("GANGLIA_SERVER"));
     multiStormSiteMap.put("storm.zookeeper.servers",
     multiStormSiteMap.put("storm.zookeeper.servers",
         new YamlMultiValuePropertyDecorator(new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER")));
         new YamlMultiValuePropertyDecorator(new MultipleHostTopologyUpdater("ZOOKEEPER_SERVER")));
+    multiStormSiteMap.put("nimbus.seeds",
+        new YamlMultiValuePropertyDecorator(new MultipleHostTopologyUpdater("NIMBUS"), YamlMultiValuePropertyDecorator.FlowStyle.PLAIN));
+
 
 
     // FALCON
     // FALCON
     falconStartupPropertiesMap.put("*.broker.url", new SingleHostTopologyUpdater("FALCON_SERVER"));
     falconStartupPropertiesMap.put("*.broker.url", new SingleHostTopologyUpdater("FALCON_SERVER"));

+ 120 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/BlueprintConfigurationProcessorTest.java

@@ -2314,6 +2314,93 @@ public class BlueprintConfigurationProcessorTest {
     }
     }
   }
   }
 
 
+  @Test
+  public void testDoUpdateForClusterCreate_Storm_Nimbus_HA_Enabled__defaultValues_YAML() throws Exception {
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+    Map<String, String> typeProps = new HashMap<String, String>();
+    typeProps.put("nimbus.seeds", "localhost");
+    properties.put("storm-site", typeProps);
+
+    Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
+
+    Collection<String> hgComponents = new HashSet<String>();
+    hgComponents.add("NIMBUS");
+
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
+
+    Collection<String> hgComponents2 = new HashSet<String>();
+    hgComponents2.add("NIMBUS");
+
+    Set<String> hosts2 = new HashSet<String>();
+    hosts2.add("testhost2");
+
+    TestHostGroup group2 = new TestHostGroup("group2", hgComponents2, hosts2);
+
+    Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
+    hostGroups.add(group1);
+    hostGroups.add(group2);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology);
+
+    updater.doUpdateForClusterCreate();
+    String updatedVal = topology.getConfiguration().getFullProperties().get("storm-site").get("nimbus.seeds");
+    assertTrue("Updated YAML value should start with bracket", updatedVal.startsWith("["));
+    assertTrue("Updated YAML value should end with bracket", updatedVal.endsWith("]"));
+    // remove the surrounding brackets
+    updatedVal = updatedVal.replaceAll("[\\[\\]]", "");
+
+    String[] hosts = updatedVal.split(",");
+
+    Collection<String> expectedHosts = new HashSet<String>();
+    expectedHosts.add("testhost");
+    expectedHosts.add("testhost2");
+
+    assertEquals("Incorrect number of hosts found in updated Nimbus config property", 2, hosts.length);
+    for (String host : hosts) {
+      assertTrue("Expected host name = " + host + " not found in updated Nimbus config property", expectedHosts.contains(host));
+      expectedHosts.remove(host);
+    }
+  }
+
+  @Test
+  public void testDoUpdateForClusterCreate_Storm_Nimbus_HA_Enabled__FQDN_ValuesSpecified_YAML() throws Exception {
+    final String expectedValue = "[c6401.ambari.apache.org, c6402.ambari.apache.org]";
+    Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
+    Map<String, String> typeProps = new HashMap<String, String>();
+    typeProps.put("nimbus.seeds", expectedValue);
+    properties.put("storm-site", typeProps);
+
+    Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
+
+    Collection<String> hgComponents = new HashSet<String>();
+    hgComponents.add("NIMBUS");
+
+    TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
+
+    Collection<String> hgComponents2 = new HashSet<String>();
+    hgComponents2.add("NIMBUS");
+
+    Set<String> hosts2 = new HashSet<String>();
+    hosts2.add("testhost2");
+
+    TestHostGroup group2 = new TestHostGroup("group2", hgComponents2, hosts2);
+
+    Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
+    hostGroups.add(group1);
+    hostGroups.add(group2);
+
+    ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
+    BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology);
+
+    updater.doUpdateForClusterCreate();
+    String updatedVal = topology.getConfiguration().getFullProperties().get("storm-site").get("nimbus.seeds");
+
+    assertEquals("nimbus.seeds property should not be updated when FQDNs are specified in configuration",
+                 expectedValue, updatedVal);
+  }
+
+
   @Test
   @Test
   public void testDoUpdateForClusterCreate_MProperty__defaultValues() throws Exception {
   public void testDoUpdateForClusterCreate_MProperty__defaultValues() throws Exception {
     Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
     Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
@@ -2714,6 +2801,7 @@ public class BlueprintConfigurationProcessorTest {
     Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
     Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
     Map<String, String> typeProps = new HashMap<String, String>();
     Map<String, String> typeProps = new HashMap<String, String>();
     typeProps.put("storm.zookeeper.servers", "['%HOSTGROUP::group1%:9090','%HOSTGROUP::group2%:9091']");
     typeProps.put("storm.zookeeper.servers", "['%HOSTGROUP::group1%:9090','%HOSTGROUP::group2%:9091']");
+    typeProps.put("nimbus.seeds", "[%HOSTGROUP::group1%, %HOSTGROUP::group4%]");
     properties.put("storm-site", typeProps);
     properties.put("storm-site", typeProps);
 
 
     Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
     Configuration clusterConfig = new Configuration(properties, Collections.<String, Map<String, Map<String, String>>>emptyMap());
@@ -2722,12 +2810,14 @@ public class BlueprintConfigurationProcessorTest {
     hgComponents.add("NAMENODE");
     hgComponents.add("NAMENODE");
     hgComponents.add("SECONDARY_NAMENODE");
     hgComponents.add("SECONDARY_NAMENODE");
     hgComponents.add("ZOOKEEPER_SERVER");
     hgComponents.add("ZOOKEEPER_SERVER");
+    hgComponents.add("NIMBUS");
     TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
     TestHostGroup group1 = new TestHostGroup("group1", hgComponents, Collections.singleton("testhost"));
 
 
     Collection<String> hgComponents2 = new HashSet<String>();
     Collection<String> hgComponents2 = new HashSet<String>();
     hgComponents2.add("DATANODE");
     hgComponents2.add("DATANODE");
     hgComponents2.add("HDFS_CLIENT");
     hgComponents2.add("HDFS_CLIENT");
     hgComponents2.add("ZOOKEEPER_SERVER");
     hgComponents2.add("ZOOKEEPER_SERVER");
+    hgComponents2.add("NIMBUS");
     Set<String> hosts2 = new HashSet<String>();
     Set<String> hosts2 = new HashSet<String>();
     hosts2.add("testhost2");
     hosts2.add("testhost2");
     hosts2.add("testhost2a");
     hosts2.add("testhost2a");
@@ -2742,10 +2832,17 @@ public class BlueprintConfigurationProcessorTest {
     hosts3.add("testhost3a");
     hosts3.add("testhost3a");
     TestHostGroup group3 = new TestHostGroup("group3", hgComponents3, hosts3);
     TestHostGroup group3 = new TestHostGroup("group3", hgComponents3, hosts3);
 
 
+    Collection<String> hgComponents4 = new HashSet<String>();
+    hgComponents4.add("NIMBUS");
+    Set<String> hosts4 = new HashSet<String>();
+    hosts4.add("testhost4");
+    TestHostGroup group4 = new TestHostGroup("group4", hgComponents4, hosts4);
+
     Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
     Collection<TestHostGroup> hostGroups = new HashSet<TestHostGroup>();
     hostGroups.add(group1);
     hostGroups.add(group1);
     hostGroups.add(group2);
     hostGroups.add(group2);
     hostGroups.add(group3);
     hostGroups.add(group3);
+    hostGroups.add(group4);
 
 
     ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
     ClusterTopology topology = createClusterTopology(bp, clusterConfig, hostGroups);
     BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology);
     BlueprintConfigurationProcessor updater = new BlueprintConfigurationProcessor(topology);
@@ -2770,6 +2867,29 @@ public class BlueprintConfigurationProcessorTest {
       assertTrue(expectedHosts.contains(host));
       assertTrue(expectedHosts.contains(host));
       expectedHosts.remove(host);
       expectedHosts.remove(host);
     }
     }
+
+
+    String updatedNimbusSeedsVal = topology.getConfiguration().getFullProperties().get("storm-site").get("nimbus.seeds");
+    assertTrue("Updated YAML value should start with bracket", updatedNimbusSeedsVal.startsWith("["));
+    assertTrue("Updated YAML value should end with bracket", updatedNimbusSeedsVal.endsWith("]"));
+    // remove the surrounding brackets
+    updatedNimbusSeedsVal = updatedNimbusSeedsVal.replaceAll("[\\[\\]]", "");
+
+    String[] nimbusHosts = updatedNimbusSeedsVal.split(",");
+
+    Collection<String> expectedNimbusHosts = new HashSet<String>();
+    expectedNimbusHosts.add("testhost");
+    expectedNimbusHosts.add("testhost4");
+
+    assertEquals("Incorrect number of hosts found in updated Nimbus config property", 2, nimbusHosts.length);
+    for (String host : nimbusHosts) {
+      assertTrue("Expected Nimbus host = " + host + " not found in nimbus.seeds property value", expectedNimbusHosts.contains(host));
+      expectedHosts.remove(host);
+    }
+
+
+
+
   }
   }
 
 
   @Test
   @Test