Pārlūkot izejas kodu

AMBARI-7845. Kafka ganglia metrics integration with Ambari. (Sriharsha Chintalapani via Jaimin Jetly)

Jaimin Jetly 10 gadi atpakaļ
vecāks
revīzija
d54b747aee

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaHostPropertyProvider.java

@@ -49,6 +49,7 @@ public class GangliaHostPropertyProvider extends GangliaPropertyProvider{
     GANGLIA_CLUSTER_NAMES.add("HDPHBaseRegionServer");
     GANGLIA_CLUSTER_NAMES.add("HDPFlumeServer");
     GANGLIA_CLUSTER_NAMES.add("HDPJournalNode");
+    GANGLIA_CLUSTER_NAMES.add("HDPKafka");
   }
 
   // ----- Constructors ------------------------------------------------------

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProvider.java

@@ -73,6 +73,7 @@ public abstract class GangliaPropertyProvider extends AbstractPropertyProvider {
     GANGLIA_CLUSTER_NAME_MAP.put("JOURNALNODE",        Arrays.asList("HDPJournalNode", "HDPSlaves"));
     GANGLIA_CLUSTER_NAME_MAP.put("NIMBUS",             Collections.singletonList("HDPNimbus"));
     GANGLIA_CLUSTER_NAME_MAP.put("SUPERVISOR",         Collections.singletonList("HDPSupervisor"));
+    GANGLIA_CLUSTER_NAME_MAP.put("KAFKA_BROKER",         Collections.singletonList("HDPKafka"));
   }
 
   protected final static Logger LOG =

+ 10 - 3
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/GANGLIA/package/scripts/ganglia_monitor.py

@@ -33,7 +33,7 @@ class GangliaMonitor(Script):
     self.install_packages(env)
     env.set_params(params)
     self.configure(env)
-    
+
     functions.turn_off_autostart(params.gmond_service_name)
     functions.turn_off_autostart("gmetad") # since the package is installed as well
 
@@ -75,7 +75,7 @@ class GangliaMonitor(Script):
     )
 
     ganglia.config()
-    
+
     self.generate_slave_configs()
 
     Directory(path.join(params.ganglia_dir, "conf.d"),
@@ -133,7 +133,7 @@ class GangliaMonitor(Script):
 
   def generate_master_configs(self):
     import params
-     
+
     if params.has_namenodes:
       generate_daemon("gmond",
                       name = "HDPNameNode",
@@ -211,6 +211,13 @@ class GangliaMonitor(Script):
                       owner = "root",
                       group = params.user_group)
 
+    if params.has_kafka_broker:
+      generate_daemon("gmond",
+                      name = "HDPKafka",
+                      role = "server",
+                      owner = "root",
+                      group = params.user_group)
+
     if params.has_flume:
       generate_daemon("gmond",
                       name = "HDPFlumeServer",

+ 4 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/GANGLIA/package/scripts/params.py

@@ -73,6 +73,8 @@ flume_hosts = set(default("/clusterHostInfo/flume_hosts", []))
 jn_hosts = set(default("/clusterHostInfo/journalnode_hosts", []))
 nimbus_server_hosts = set(default("/clusterHostInfo/nimbus_hosts", []))
 supervisor_server_hosts = set(default("/clusterHostInfo/supervisor_hosts", []))
+kafka_broker_hosts =  set(default("/clusterHostInfo/kafka_broker_hosts", []))
+kafka_ganglia_port = default("/configurations/kafka-broker/kafka.ganglia.metrics.port", 8671)
 
 pure_slave = not hostname in (namenode_host | jtnode_host | rm_host | hs_host | \
                               hbase_master_hosts | slave_hosts | tt_hosts | hbase_rs_hosts | \
@@ -106,6 +108,7 @@ has_flume = not len(flume_hosts) == 0
 has_journalnode = not len(jn_hosts) == 0
 has_nimbus_server = not len(nimbus_server_hosts) == 0
 has_supervisor_server = not len(supervisor_server_hosts) == 0
+has_kafka_broker = not len(kafka_broker_hosts) == 0
 
 ganglia_cluster_names = {
   "jn_hosts": [("HDPJournalNode", 8654)],
@@ -121,6 +124,7 @@ ganglia_cluster_names = {
   "hs_host": [("HDPHistoryServer", 8666)],
   "nimbus_hosts": [("HDPNimbus", 8649)],
   "supervisor_hosts": [("HDPSupervisor", 8650)],
+  "kafka_broker_hosts": [("HDPKafka", kafka_ganglia_port)],
   "ReservedPort1": [("ReservedPort1", 8667)],
   "ReservedPort2": [("ReservedPort2", 8668)],
   "ReservedPort3": [("ReservedPort3", 8669)]

+ 1 - 4
ambari-server/src/main/resources/stacks/HDP/2.2/services/KAFKA/configuration/kafka-broker.xml

@@ -289,7 +289,6 @@
       Backoff time between shutdown retries.
     </description>
   </property>
-  <!--
   <property>
     <name>kafka.metrics.reporters</name>
     <value>kafka.ganglia.KafkaGangliaMetricsReporter</value>
@@ -297,7 +296,6 @@
       kafka ganglia metrics reporter
     </description>
   </property>
-
   <property>
     <name>kafka.ganglia.metrics.reporter.enabled</name>
     <value>true</value>
@@ -305,7 +303,6 @@
       kafka ganglia metrics reporter enable
     </description>
   </property>
-  -->
   <property>
     <name>kafka.ganglia.metrics.host</name>
     <value>localhost</value>
@@ -313,7 +310,7 @@
   </property>
   <property>
     <name>kafka.ganglia.metrics.port</name>
-    <value>8649</value>
+    <value>8671</value>
     <description> Ganglia port </description>
   </property>
   <property>

+ 262 - 0
ambari-server/src/main/resources/stacks/HDP/2.2/services/KAFKA/metrics.json

@@ -0,0 +1,262 @@
+{
+  "KAFKA_BROKER": {
+    "Component": [
+      {
+        "type": "ganglia",
+        "metrics": {
+          "metrics/jvm/uptime":{
+            "metric":"jvm.uptime",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/heap_usage":{
+            "metric":"jvm.heap_usage",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/non_heap_usage":{
+            "metric":"jvm.non_heap_usage",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/thread-states/runnable":{
+            "metric":"jvm.thread-states.runnable",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/thread-states/blocked":{
+            "metric":"jvm.thread-states.blocked",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/thread-states/timed_waiting":{
+            "metric":"jvm.thread-states.timed_waiting",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/thread-states/terminated":{
+            "metric":"jvm.thread-states.terminated",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/thread_count":{
+            "metric":"jvm.thread_count",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/jvm/daemon_thread_count":{
+            "metric":"jvm.daemon_thread_count",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/1MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsMessagesInPerSec.1MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/5MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsMessagesInPerSec.5MinuteRate",
+            "pointInTime": false,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/15MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsMessagesInPerSec.15MinuteRate",
+            "pointInTime": false,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/meanRate": {
+            "metric": "kafka.server.BrokerTopicMetrics/AllTopicsMessagesInPerSec/meanRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsMessagesInPerSec/count": {
+            "metric": "kafka.server.BrokerTopicMetrics/AllTopicsMessagesInPerSec.counte",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/1MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesInPerSec.1MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/5MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesInPerSec.5MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/15MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesInPerSec.15MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/meanRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesInPerSec.meanRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesInPerSec/count": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesInPerSec.count",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/1MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesOutPerSec.1MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/5MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesOutPerSec.5MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/15MinuteRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesOutPerSec.15MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/meanRate": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesOutPerSec.meanRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/server/BrokerTopicMetrics/AllTopicsBytesOutPerSec/count": {
+            "metric": "kafka.server.BrokerTopicMetrics.AllTopicsBytesOutPerSec.count",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/KafkaController/ActiveControllerCount": {
+            "metric": "kafka.controller.KafkaController.ActiveControllerCount",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/log/LogFlushStats/LogFlushRateAndTimeMs/meanRate": {
+            "metric": "kafka.log.LogFlushStats.LogFlushRateAndTimeMs.meanRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/log/LogFlushStats/LogFlushRateAndTimeMs/1MinuteRate": {
+            "metric": "kafka.log.LogFlushStats.LogFlushRateAndTimeMs.1MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/log/LogFlushStats/LogFlushRateAndTimeMs/5MinuteRate": {
+            "metric": "kafka.log.LogFlushStats.LogFlushRateAndTimeMs.1MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/log/LogFlushStats/LogFlushRateAndTimeMs/15MinuteRate": {
+            "metric": "kafka.log.LogFlushStats.LogFlushRateAndTimeMs.15MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/log/LogFlushStats/LogFlushRateAndTimeMs/count": {
+            "metric": "kafka.log.LogFlushStats.LogFlushRateAndTimeMs.count",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/meanRate": {
+            "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.meanRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/1MinuteRate": {
+            "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/5MinuteRate": {
+            "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.5MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/15MinuteRate": {
+            "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.15MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/LeaderElectionRateAndTimeMs/count": {
+            "metric": "kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.count",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/1MinuteRate": {
+            "metric": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.1MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/5MinuteRate": {
+            "metric": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.5MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/UncleanLeaderElectionsPerSec/15MinuteRate": {
+            "metric": "kafka.controller.ControllerStats.UncleanLeaderElectionsPerSec.15MinuteRate",
+            "pointInTime": true,
+            "temporal": true
+          },
+          "metrics/kafka/controller/ControllerStats/OfflinePartitionsCount": {
+              "metric": "kafka.controller.ControllerStats.OfflinePartitionsCount",
+              "pointInTime" :true,
+              "temporal": true
+          },
+          "metrics/kafka/server/ReplicaManager/PartitionCount": {
+              "metric": "kafka.server.ReplicaManager.PartitionCount",
+              "pointInTime" : true,
+              "temporal": true
+          },
+          "metrics/kafka/server/ReplicaManager/LeaderCount": {
+              "metric": "kafka.server.ReplicaManager.LeaderCount",
+              "pointInTime" : true,
+              "temporal": true
+          },
+          "metrics/kafka/server/ReplicaManager/UnderReplicatedPartitions": {
+              "metric": "kafka.server.ReplicaManager.UnderReplicatedPartitions",
+              "pointInTime" :true,
+              "temporal": true
+          },
+          "metrics/kafka/server/ReplicaManager/ISRShrinksPerSec": {
+              "metric": "kafka.server.ReplicaManager.ISRShrinksPerSec",
+              "pointInTime" : true,
+              "temporal": true
+          },
+          "metrics/kafka/server/ReplicaManager/ISRExpandsPerSec": {
+              "metric": "kafka.server.ReplicaManager.ISRExpandsPerSec",
+              "pointInTime" : true,
+              "temporal": true
+          },
+
+          "metrics/kafka/server/ReplicaFetcherManager/Replica-MaxLag": {
+              "metric": "kafka.server.ReplicaFetcherManager.Replica-MaxLag",
+              "pointInTime" : true,
+              "temporal": true
+          },
+          "metrics/kafka/server/ProducerRequestPurgatory/PurgatorySize": {
+              "metric": "kafka.server.ProducerRequestPurgatory.PurgatorySize",
+              "pointInTime" : true,
+              "temporal": true
+          },
+          "metrics/kafka/server/FetchRequestPurgatory/PurgatorySize": {
+              "metric": "kafka.server.FetchRequestPurgatory.PurgatorySize",
+              "pointInTime" : true,
+              "temporal": true
+          },
+          "metrics/kafka/cluster/Partition/$1-UnderReplicated":{
+            "metric":"kafka.cluster.Partition.(\\w+)-UnderReplicated",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/kafka/consumer/ConsumerFetcherManager/$1-MaxLag":{
+            "metric":"kafka.consumer.ConsumerFetcherManager.(\\w+)-MaxLag",
+            "pointInTime":true,
+            "temporal":true
+          },
+          "metrics/kafka/consumer/ConsumerFetcherManager/$1-MinFetch":{
+            "metric":"kafka.consumer.ConsumerFetcherManager.(\\w+)-MinFetch",
+            "pointInTime":true,
+            "temporal":true
+          }
+        }
+      }
+    ]
+  }
+}

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.2/services/KAFKA/package/scripts/kafka.py

@@ -34,7 +34,7 @@ def kafka():
     brokerid = str(sorted(params.kafka_hosts).index(params.hostname))
     kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
     kafka_server_config['broker.id'] = brokerid
-    kafka_server_config['hostname'] = params.hostname
+    kafka_server_config['host.name'] = params.hostname
     conf_dir = params.conf_dir
     properties_config("server.properties",
                       conf_dir=params.conf_dir,

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.2/services/KAFKA/package/scripts/params.py

@@ -45,7 +45,7 @@ hostname = config['hostname']
 user_group = config['configurations']['cluster-env']['user_group']
 java64_home = config['hostLevelParams']['java_home']
 kafka_env_sh_template = config['configurations']['kafka-env']['content']
-kafka_hosts = config['clusterHostInfo']['zookeeper_hosts']
+kafka_hosts = config['clusterHostInfo']['kafka_broker_hosts']
 kafka_hosts.sort()
 
 zookeeper_hosts = config['clusterHostInfo']['zookeeper_hosts']

+ 15 - 15
ambari-server/src/test/java/org/apache/ambari/server/controller/ganglia/GangliaPropertyProviderTest.java

@@ -17,19 +17,6 @@
  */
 package org.apache.ambari.server.controller.ganglia;
 
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
 import org.apache.ambari.server.configuration.ComponentSSLConfigurationTest;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
@@ -51,6 +38,19 @@ import org.junit.runners.Parameterized;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+
 /**
  * Test the Ganglia property provider.
  */
@@ -323,7 +323,7 @@ public class GangliaPropertyProviderTest {
     uriBuilder.setScheme((configuration.isGangliaSSL() ? "https" : "http"));
     uriBuilder.setHost("domU-12-31-39-0E-34-E1.compute-1.internal");
     uriBuilder.setPath("/cgi-bin/rrd.py");
-    uriBuilder.setParameter("c", "HDPJobTracker,HDPHBaseMaster,HDPResourceManager,HDPFlumeServer,HDPSlaves,HDPHistoryServer,HDPJournalNode,HDPTaskTracker,HDPHBaseRegionServer,HDPNameNode");
+    uriBuilder.setParameter("c", "HDPJobTracker,HDPHBaseMaster,HDPKafka,HDPResourceManager,HDPFlumeServer,HDPSlaves,HDPHistoryServer,HDPJournalNode,HDPTaskTracker,HDPHBaseRegionServer,HDPNameNode");
     uriBuilder.setParameter("h", "domU-12-31-39-0E-34-E3.compute-1.internal,domU-12-31-39-0E-34-E1.compute-1.internal,domU-12-31-39-0E-34-E2.compute-1.internal");
     uriBuilder.setParameter("m", "jvm.metrics.gcCount");
     uriBuilder.setParameter("s", "10");
@@ -382,7 +382,7 @@ public class GangliaPropertyProviderTest {
     expectedUri.setScheme((configuration.isGangliaSSL() ? "https" : "http"));
     expectedUri.setHost("domU-12-31-39-0E-34-E1.compute-1.internal");
     expectedUri.setPath("/cgi-bin/rrd.py");
-    expectedUri.setParameter("c", "HDPJobTracker,HDPHBaseMaster,HDPResourceManager,HDPFlumeServer,HDPSlaves,HDPHistoryServer,HDPJournalNode,HDPTaskTracker,HDPHBaseRegionServer,HDPNameNode");
+    expectedUri.setParameter("c", "HDPJobTracker,HDPHBaseMaster,HDPKafka,HDPResourceManager,HDPFlumeServer,HDPSlaves,HDPHistoryServer,HDPJournalNode,HDPTaskTracker,HDPHBaseRegionServer,HDPNameNode");
    
     expectedUri.setParameter("h", hostsList.toString());
     expectedUri.setParameter("m", "jvm.metrics.gcCount");

+ 1 - 1
ambari-server/src/test/python/stacks/2.2/configs/default.json

@@ -234,7 +234,7 @@
         "jaimin-knox-1.c.pramod-thangali.internal"
       ],
       "kafka_broker_hosts": [
-        "c6402.ambari.apache.org"
+        "c6401.ambari.apache.org"
       ],
        "zookeeper_hosts": [
          "c6401.ambari.apache.org"

+ 18 - 4
ambari-web/app/data/HDP2/site_properties.js

@@ -1690,18 +1690,18 @@ module.exports =
       "isOverridable": false,
       "serviceName": "KNOX",
       "filename": "knox-env.xml",
-      "category": "Advanced knox-env.xml"
+      "category": "Advanced knox-env"
     },
 
   /********************************************* KAFKA *****************************/
     {
       "id": "puppet var",
-      "name": "kafka_broker_host",
+      "name": "kafka_broker_hosts",
       "displayName": "Kafka Broker host",
       "value": "",
       "defaultValue": "",
       "description": "The host that has been assigned to run Kafka Broker",
-      "displayType": "masterHost",
+      "displayType": "masterHosts",
       "isOverridable": false,
       "isVisible": true,
       "isRequiredByAgent": false,
@@ -1781,7 +1781,7 @@ module.exports =
       "displayType": "directory",
       "serviceName": "KAFKA",
       "filename": "kafka-broker.xml",
-      "category": "Advanced kafka-env.xml",
+      "category": "Advanced kafka-env",
       "index": 0
     },
 
@@ -3960,6 +3960,20 @@ module.exports =
       "belongsToService": ["KNOX"],
       "index": 19
     },
+    {
+      "id": "puppet var",
+      "name": "kafka_user",
+      "displayName": "Kafka User",
+      "isReconfigurable": false,
+      "displayType": "user",
+      "isOverridable": false,
+      "isVisible": true,
+      "serviceName": "MISC",
+      "filename": "kafka-env.xml",
+      "category": "Users and Groups",
+      "belongsToService": ["KAFKA"],
+      "index": 20
+    },
     {
       "id": "puppet var",
       "name": "rrdcached_base_dir",

+ 8 - 2
ambari-web/app/models/service_config.js

@@ -481,8 +481,14 @@ App.ServiceConfigProperty = Ember.Object.extend({
       case 'knox_gateway_host':
         this.set('value', masterComponentHostsInDB.findProperty('component', 'KNOX_GATEWAY').hostName);
         break;
-      case 'kafka_broker_host':
-        this.set('value', masterComponentHostsInDB.findProperty('component', 'KAFKA_BROKER').hostName);
+      case 'kafka_broker_hosts':
+        this.set('value', masterComponentHostsInDB.filterProperty('component', 'KAFKA_BROKER').mapProperty('hostName'));
+        break;
+      case 'kafka.ganglia.metrics.host':
+        var gangliaHost =  masterComponentHostsInDB.findProperty('component', 'GANGLIA_SERVER');
+        if (gangliaHost) {
+          this.set('value', gangliaHost.hostName);
+        }
         break;
       case 'hbase.zookeeper.quorum':
         var zkHosts = masterComponentHostsInDB.filterProperty('component', 'ZOOKEEPER_SERVER').mapProperty('hostName');

+ 30 - 0
ambari-web/test/models/service_config_test.js

@@ -616,3 +616,33 @@ describe('App.ServiceConfigProperty', function () {
   });
 
 });
+
+describe('initialValue', function () {
+
+  var tests = [
+    {
+      message: 'kafka.ganglia.metrics.host property should have the value of ganglia hostname when ganglia is selected',
+      localDB: {masterComponentHosts: [
+        {component: 'GANGLIA_SERVER', hostName: 'c6401'}
+      ]},
+      expected: 'c6401'
+    },
+    {
+      message: 'kafka.ganglia.metrics.host property should have the value "localhost" when ganglia is not selected',
+      localDB: {masterComponentHosts: [
+        {component: 'NAMENODE', hostName: 'c6401'}
+      ]},
+      expected: 'localhost'
+    }
+  ];
+  var serviceConfigProperty;
+  beforeEach(function () {
+    serviceConfigProperty = App.ServiceConfigProperty.create({name: 'kafka.ganglia.metrics.host', value: 'localhost'});
+  });
+  tests.forEach(function(test){
+    it(test.message, function () {
+      serviceConfigProperty.initialValue(test.localDB);
+      expect(serviceConfigProperty.get('value')).to.equal(test.expected);
+    });
+  });
+});