Prechádzať zdrojové kódy

AMBARI-17694. Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (Anita Jebaraj via rlevas)

Anita Jebaraj 9 rokov pred
rodič
commit
79a8296a58

+ 32 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java

@@ -20,7 +20,6 @@ package org.apache.ambari.server.state.kerberos;
 
 import com.google.inject.Singleton;
 import org.apache.ambari.server.AmbariException;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
@@ -49,6 +48,7 @@ public class VariableReplacementHelper {
     {
       put("each", new EachFunction());
       put("toLower", new ToLowerFunction());
+      put("replace", new ReplaceValue());
     }
   };
 
@@ -226,6 +226,37 @@ public class VariableReplacementHelper {
       return "";
     }
   }
+  /**
+   * ReplaceValue is a Function implementation that replaces the value in the string
+   * <p/>
+   * This function expects the following arguments (in order) within the args array:
+   * <ol>
+   * <li>regular expression that should be replaced</li>
+   * <li>replacement value for the string</li>
+   * </ol>
+   */ 
+  private static class ReplaceValue implements Function {
+    
+    @Override
+    public String perform(String[] args, String data) {
+      if ((args == null) || (args.length != 2)) {
+        throw new IllegalArgumentException("Invalid number of arguments encountered");
+      }
+      if (data != null) {
+        StringBuffer builder = new StringBuffer();
+        String regex = args[0];
+        String replacement = args[1];
+        Pattern pattern = Pattern.compile(regex);
+        Matcher matcher = pattern.matcher(data);
+        while(matcher.find()) {
+          matcher.appendReplacement(builder, replacement);
+        }
+        matcher.appendTail(builder);
+        return builder.toString();
+      }
+      return "";
+    }
+  }
 
   /**
    * ToLowerFunction is a Function implementation that converts a String to lowercase

+ 5 - 10
ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py

@@ -80,21 +80,16 @@ def kafka(upgrade_type=None):
 
        listeners = kafka_server_config['listeners'].replace("localhost", params.hostname)
        Logger.info(format("Kafka listeners: {listeners}"))
+       kafka_server_config['listeners'] = listeners       
 
        if params.security_enabled and params.kafka_kerberos_enabled:
          Logger.info("Kafka kerberos security is enabled.")
-         if "SASL" not in listeners:
-           listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL")
-
-         kafka_server_config['listeners'] = listeners
          kafka_server_config['advertised.listeners'] = listeners
          Logger.info(format("Kafka advertised listeners: {listeners}"))
-       else:
-         kafka_server_config['listeners'] = listeners
-         if 'advertised.listeners' in kafka_server_config:
-           advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
-           kafka_server_config['advertised.listeners'] = advertised_listeners
-           Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
+       elif 'advertised.listeners' in kafka_server_config:
+         advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname)
+         kafka_server_config['advertised.listeners'] = advertised_listeners
+         Logger.info(format("Kafka advertised listeners: {advertised_listeners}"))
     else:
       kafka_server_config['host.name'] = params.hostname
 

+ 2 - 1
ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json

@@ -14,7 +14,8 @@
               "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
               "super.users": "user:${kafka-env/kafka_user}",
               "security.inter.broker.protocol": "PLAINTEXTSASL",
-              "zookeeper.set.acl": "true"
+              "zookeeper.set.acl": "true",
+              "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
           }
         }
       ],

+ 2 - 1
ambari-server/src/main/resources/stacks/HDP/2.5/services/KAFKA/kerberos.json

@@ -14,7 +14,8 @@
               "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal",
               "super.users": "user:${kafka-env/kafka_user}",
               "security.inter.broker.protocol": "PLAINTEXTSASL",
-              "zookeeper.set.acl": "true"
+              "zookeeper.set.acl": "true",
+              "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}"
           }
         },
         {

+ 7 - 1
ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java

@@ -158,6 +158,10 @@ public class VariableReplacementHelperTest {
           put("realm", "UNIT.TEST");
         }});
 
+        put("kafka-broker", new HashMap<String, String>() {{
+          put("listeners", "PLAINTEXT://localhost:6667");
+        }});
+        
         put("clusterHostInfo", new HashMap<String, String>() {{
           put("hive_metastore_host", "host1.unit.test, host2.unit.test , host3.unit.test"); // spaces are there on purpose.
         }});
@@ -171,6 +175,8 @@ public class VariableReplacementHelperTest {
         helper.replaceVariables("hive.metastore.local=false,hive.metastore.uris=${clusterHostInfo/hive_metastore_host | each(thrift://%s:9083, \\\\,, \\s*\\,\\s*)},hive.metastore.sasl.enabled=true,hive.metastore.execute.setugi=true,hive.metastore.warehouse.dir=/apps/hive/warehouse,hive.exec.mode.local.auto=false,hive.metastore.kerberos.principal=hive/_HOST@${realm}", configurations));
 
     Assert.assertEquals("test=unit.test", helper.replaceVariables("test=${realm|toLower()}", configurations));
+  
+    Assert.assertEquals("PLAINTEXTSASL://localhost:6667", helper.replaceVariables("${kafka-broker/listeners|replace(\\bPLAINTEXT\\b,PLAINTEXTSASL)}", configurations)); 
   }
 
-}
+}