Bladeren bron

AMBARI-9185. Add Kafka metric sink implementation to enable sink to AMS.

Siddharth Wagle 10 jaren geleden
bovenliggende
commit
38429224c7
17 gewijzigde bestanden met toevoegingen van 1263 en 12 verwijderingen
  1. 46 1
      ambari-metrics/ambari-metrics-assembly/pom.xml
  2. 8 0
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
  3. 8 0
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
  4. 8 3
      ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
  5. 163 0
      ambari-metrics/ambari-metrics-kafka-sink/pom.xml
  6. 21 0
      ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml
  7. 34 0
      ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml
  8. 448 0
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
  9. 25 0
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java
  10. 218 0
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java
  11. 109 0
      ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
  12. 105 0
      ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
  13. 2 5
      ambari-metrics/pom.xml
  14. 28 3
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
  15. 6 0
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
  16. 5 0
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
  17. 29 0
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py

+ 46 - 1
ambari-metrics/ambari-metrics-assembly/pom.xml

@@ -36,6 +36,7 @@
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
     <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
+    <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
     <python.ver>python &gt;= 2.6</python.ver>
     <python.devel>python-devel</python.devel>
     <deb.python.ver>python (&gt;= 2.6)</deb.python.ver>
@@ -44,6 +45,7 @@
     <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
     <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
     <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
+    <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
   </properties>
 
   <build>
@@ -448,7 +450,6 @@
                           <location>${hadoop-sink.dir}/target/ambari-metrics-hadoop-sink-with-common-${project.version}.jar</location>
                         </source>
                       </sources>
-
                     </mapping>
                     <mapping>
                       <directory>/usr/lib/flume/lib</directory>
@@ -466,6 +467,22 @@
                         </source>
                       </sources>
                     </mapping>
+                    <mapping>
+                      <directory>/usr/lib/ambari-metrics-kafka-sink</directory>
+                      <sources>
+                        <source>
+                          <location>${kafka-sink.dir}/target/${kafka.sink.jar}</location>
+                        </source>
+                      </sources>
+                    </mapping>
+                     <mapping>
+                      <directory>/usr/lib/ambari-metrics-kafka-sink/lib</directory>
+                      <sources>
+                        <source>
+                          <location>${kafka-sink.dir}/target/lib</location>
+                        </source>
+                      </sources>
+                    </mapping>
                   </mappings>
                 </configuration>
 
@@ -596,6 +613,7 @@
                     <path>/var/log/ambari-metrics-collector</path>
                     <path>/var/lib/ambari-metrics-collector</path>
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
+                    <path>/usr/lib/ambari-metrics-kafka-sink</path>
                     <path>/usr/lib/flume/lib</path>
                     <path>/usr/lib/storm/lib</path>
                   </paths>
@@ -772,6 +790,28 @@
                   </mapper>
                 </data>
 
+                <!-- kafka sink -->
+
+                <data>
+                  <src>${kafka-sink.dir}/target/${kafka.sink.jar}</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <filemode>644</filemode>
+                    <dirmode>755</dirmode>
+                    <prefix>/usr/lib/ambari-metrics-kafka-sink</prefix>
+                  </mapper>
+                </data>
+                <data>
+                  <src>${kafka-sink.dir}/target/lib</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <filemode>644</filemode>
+                    <dirmode>755</dirmode>
+                    <prefix>/usr/lib/ambari-metrics-kafka-sink/lib</prefix>
+                  </mapper>
+                </data>
               </dataSet>
             </configuration>
           </plugin>
@@ -1062,6 +1102,11 @@
       <artifactId>ambari-metrics-storm-sink</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-kafka-sink</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-host-monitoring</artifactId>

+ 8 - 0
ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml

@@ -37,6 +37,10 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
+    <fileSet>
+      <directory>${kafka-sink.dir}/target/lib</directory>
+      <outputDirectory>hadoop-sink/lib</outputDirectory>
+    </fileSet>
   </fileSets>
 
   <files>
@@ -52,6 +56,10 @@
       <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
+    <file>
+      <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
+      <outputDirectory>hadoop-sink</outputDirectory>
+    </file>
     <file>
       <source>${basedir}/src/main/package/msi/sink.wxs</source>
       <outputDirectory>../../</outputDirectory>

+ 8 - 0
ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml

@@ -38,6 +38,10 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
+    <fileSet>
+      <directory>${kafka-sink.dir}/target/lib</directory>
+      <outputDirectory>hadoop-sink/lib</outputDirectory>
+    </fileSet>
   </fileSets>
 
   <files>
@@ -53,6 +57,10 @@
       <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
+    <file>
+      <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
+      <outputDirectory>hadoop-sink</outputDirectory>
+    </file>
   </files>
 
 

+ 8 - 3
ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh

@@ -15,15 +15,20 @@
 # limitations under the License
 
 HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar"
-FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
 HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
+
+FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
 FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}"
+
+KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar"
+KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
+
 #link for storm jar not required with current loading
 #STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}"
 #STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar"
 
-JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR})
-LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME})
+JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR})
+LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME})
 
 for index in ${!LINKS[*]}
 do

+ 163 - 0
ambari-metrics/ambari-metrics-kafka-sink/pom.xml

@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>0.1.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>ambari-metrics-kafka-sink</artifactId>
+  <version>0.1.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.9</version>
+        <executions>
+         <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc,hadoop-common</includeArtifactIds>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assemblies/jar-with-common.xml</descriptor>
+              </descriptors>
+              <attach>false</attach>
+              <tarLongFileMode>gnu</tarLongFileMode>
+              <appendAssemblyId>false</appendAssemblyId>
+              <finalName>${project.artifactId}-with-common-${project.version}</finalName>
+            </configuration>
+            <id>build-jar</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.0</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <id>parse-version</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>parse-version</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>regex-property</id>
+            <goals>
+              <goal>regex-property</goal>
+            </goals>
+            <configuration>
+              <name>ambariVersion</name>
+              <value>${project.version}</value>
+              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+              <replacement>$1.$2.$3</replacement>
+              <failIfNoMatch>false</failIfNoMatch>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.goldin</groupId>
+        <artifactId>copy-maven-plugin</artifactId>
+        <version>0.2.5</version>
+        <executions>
+          <execution>
+            <id>create-archive</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>0.8.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>com.yammer.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>2.2.0</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>4.10</version>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-easymock</artifactId>
+      <version>1.4.9</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.4.9</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.5</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

+ 21 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml

@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+    <id>empty</id>
+    <formats/>
+</assembly>

+ 34 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml

@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly>
+  <id>jar-with-common</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>/</outputDirectory>
+      <unpack>true</unpack>
+      <includes>
+        <include>org.apache.ambari:ambari-metrics-common</include>
+        <include>org.apache.ambari:ambari-metrics-kafka-sink</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+</assembly>

+ 448 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.kafka;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import kafka.metrics.KafkaMetricsConfig;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.utils.VerifiableProperties;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.util.Servers;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Metered;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricProcessor;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.stats.Snapshot;
+
+public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink implements KafkaMetricsReporter,
+    KafkaTimelineMetricsReporterMBean {
+
+  private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class);
+
+  private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval";
+  private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize";
+  private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host";
+  private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port";
+  private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled";
+  private static final String TIMELINE_DEFAULT_HOST = "localhost";
+  private static final String TIMELINE_DEFAULT_PORT = "8188";
+
+  private boolean initialized = false;
+  private boolean running = false;
+  private Object lock = new Object();
+  private String collectorUri;
+  private String hostname;
+  private SocketAddress socketAddress;
+  private TimelineScheduledReporter reporter;
+  private TimelineMetricsCache metricsCache;
+
+  @Override
+  protected SocketAddress getServerSocketAddress() {
+    return socketAddress;
+  }
+
+  @Override
+  protected String getCollectorUri() {
+    return collectorUri;
+  }
+
+  public void setMetricsCache(TimelineMetricsCache metricsCache) {
+    this.metricsCache = metricsCache;
+  }
+
+  public void init(VerifiableProperties props) {
+    synchronized (lock) {
+      if (!initialized) {
+        LOG.info("Initializing Kafka Timeline Metrics Sink");
+        try {
+          hostname = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+          LOG.error("Could not identify hostname.");
+          throw new RuntimeException("Could not identify hostname.", e);
+        }
+        KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
+        int metricsSendInterval = Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY,
+            String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
+        int maxRowCacheSize = Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY,
+            String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+        String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
+        String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
+        setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
+        collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
+        List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost,
+            Integer.parseInt(metricCollectorPort));
+        if (socketAddresses != null && !socketAddresses.isEmpty()) {
+          socketAddress = socketAddresses.get(0);
+        }
+        initializeReporter();
+        if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
+          startReporter(metricsConfig.pollingIntervalSecs());
+        }
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("CollectorUri = " + collectorUri);
+          LOG.trace("SocketAddress = " + socketAddress);
+          LOG.trace("MetricsSendInterval = " + metricsSendInterval);
+          LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
+        }
+      }
+    }
+  }
+
+  public String getMBeanName() {
+    return "kafka:type=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter";
+  }
+
+  public synchronized void startReporter(long period) {
+    synchronized (lock) {
+      if (initialized && !running) {
+        reporter.start(period, TimeUnit.SECONDS);
+        running = true;
+        LOG.info(String.format("Started Kafka Timeline metrics reporter with polling period %d seconds", period));
+      }
+    }
+  }
+
+  public synchronized void stopReporter() {
+    synchronized (lock) {
+      if (initialized && running) {
+        reporter.stop();
+        running = false;
+        LOG.info("Stopped Kafka Timeline metrics reporter");
+        initializeReporter();
+      }
+    }
+  }
+
+  private void initializeReporter() {
+    reporter = new TimelineScheduledReporter(Metrics.defaultRegistry(), "timeline-scheduled-reporter",
+        TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+    initialized = true;
+  }
+
+  interface Context {
+    public List<TimelineMetric> getTimelineMetricList();
+  }
+
+  class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> {
+
+    private static final String APP_ID = "kafka_broker";
+    private static final String COUNT_SUFIX = ".count";
+    private static final String ONE_MINUTE_RATE_SUFIX = ".1MinuteRate";
+    private static final String MEAN_SUFIX = ".mean";
+    private static final String MEAN_RATE_SUFIX = ".meanRate";
+    private static final String FIVE_MINUTE_RATE_SUFIX = ".5MinuteRate";
+    private static final String FIFTEEN_MINUTE_RATE_SUFIX = ".15MinuteRate";
+    private static final String MIN_SUFIX = ".min";
+    private static final String MAX_SUFIX = ".max";
+    private static final String MEDIAN_SUFIX = ".median";
+    private static final String STD_DEV_SUFIX = "stddev";
+    private static final String SEVENTY_FIFTH_PERCENTILE_SUFIX = ".75percentile";
+    private static final String NINETY_FIFTH_PERCENTILE_SUFIX = ".95percentile";
+    private static final String NINETY_EIGHTH_PERCENTILE_SUFIX = ".98percentile";
+    private static final String NINETY_NINTH_PERCENTILE_SUFIX = ".99percentile";
+    private static final String NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX = ".999percentile";
+
+    protected TimelineScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
+      super(registry, name, rateUnit, durationUnit);
+    }
+
+    @Override
+    public void report(Set<Entry<MetricName, Metric>> metrics) {
+      final List<TimelineMetric> metricsList = new ArrayList<TimelineMetric>();
+      try {
+        for (Entry<MetricName, Metric> entry : metrics) {
+          final MetricName metricName = entry.getKey();
+          final Metric metric = entry.getValue();
+          Context context = new Context() {
+
+            public List<TimelineMetric> getTimelineMetricList() {
+              return metricsList;
+            }
+
+          };
+          metric.processWith(this, metricName, context);
+        }
+      } catch (Throwable t) {
+        LOG.error("Exception processing Kafka metric", t);
+      }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Metrics List size: " + metricsList.size());
+        LOG.trace("Metics Set size: " + metrics.size());
+      }
+      if (!metricsList.isEmpty()) {
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.setMetrics(metricsList);
+        try {
+          emitMetrics(timelineMetrics);
+        } catch (IOException e) {
+          LOG.error("Unexpected error", e);
+        } catch (Throwable t) {
+          LOG.error("Exception emitting metrics", t);
+        }
+      }
+    }
+
+    private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName,
+        Number attributeValue) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Creating timeline metric: " + attributeName + " = " + attributeValue + " time = "
+            + currentTimeMillis + " app_id = " + component);
+      }
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setMetricName(attributeName);
+      timelineMetric.setHostName(hostname);
+      timelineMetric.setAppId(component);
+      timelineMetric.setStartTime(currentTimeMillis);
+      timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
+      timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
+      return timelineMetric;
+    }
+
+    @Override
+    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {
+      final long currentTimeMillis = System.currentTimeMillis();
+      final String sanitizedName = sanitizeName(name);
+      final String meterCountName = sanitizedName + COUNT_SUFIX;
+      final TimelineMetric countMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count());
+
+      final String meterOneMinuteRateName = sanitizedName + ONE_MINUTE_RATE_SUFIX;
+      final TimelineMetric oneMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          meterOneMinuteRateName, meter.oneMinuteRate());
+
+      final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX;
+      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName,
+          meter.meanRate());
+
+      final String meterFiveMinuteRateName = sanitizedName + FIVE_MINUTE_RATE_SUFIX;
+      final TimelineMetric fiveMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          meterFiveMinuteRateName, meter.fiveMinuteRate());
+
+      final String meterFifteenMinuteRateName = sanitizedName + FIFTEEN_MINUTE_RATE_SUFIX;
+      final TimelineMetric fifteenMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          meterFifteenMinuteRateName, meter.fifteenMinuteRate());
+
+      metricsCache.putTimelineMetric(countMetric);
+      metricsCache.putTimelineMetric(oneMinuteRateMetric);
+      metricsCache.putTimelineMetric(meanMetric);
+      metricsCache.putTimelineMetric(fiveMinuteRateMetric);
+      metricsCache.putTimelineMetric(fifteenMinuteRateMetric);
+
+      String[] metricNames = new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName,
+          meterFiveMinuteRateName, meterFifteenMinuteRateName };
+      populateMetricsList(context, metricNames);
+    }
+
+    @Override
+    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {
+      final long currentTimeMillis = System.currentTimeMillis();
+      final String sanitizedName = sanitizeName(name);
+      final String metricCountName = sanitizedName + COUNT_SUFIX;
+      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricCountName, counter.count());
+      metricsCache.putTimelineMetric(metric);
+      populateMetricsList(context, metricCountName);
+    }
+
+    @Override
+    public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {
+      final long currentTimeMillis = System.currentTimeMillis();
+      final Snapshot snapshot = histogram.getSnapshot();
+      final String sanitizedName = sanitizeName(name);
+
+      final String histogramMinName = sanitizedName + MIN_SUFIX;
+      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMinName,
+          histogram.min());
+
+      final String histogramMaxName = sanitizedName + MAX_SUFIX;
+      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMaxName,
+          histogram.max());
+
+      final String histogramMeanName = sanitizedName + MEAN_SUFIX;
+      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName,
+          histogram.mean());
+
+      final String histogramMedianName = sanitizedName + MEDIAN_SUFIX;
+      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName,
+          snapshot.getMedian());
+
+      final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX;
+      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName,
+          histogram.stdDev());
+
+      final String histogramSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
+      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          histogramSeventyFifthPercentileName, snapshot.get75thPercentile());
+
+      final String histogramNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
+      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          histogramNinetyFifthPercentileName, snapshot.get95thPercentile());
+
+      final String histogramNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
+      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          histogramNinetyEighthPercentileName, snapshot.get98thPercentile());
+
+      final String histogramNinetyNinethPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
+      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          histogramNinetyNinethPercentileName, snapshot.get99thPercentile());
+
+      final String histogramNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
+      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          histogramNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
+
+      metricsCache.putTimelineMetric(minMetric);
+      metricsCache.putTimelineMetric(maxMetric);
+      metricsCache.putTimelineMetric(meanMetric);
+      metricsCache.putTimelineMetric(medianMetric);
+      metricsCache.putTimelineMetric(stdDevMetric);
+      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
+
+      String[] metricNames = new String[] { histogramMaxName, histogramMeanName, histogramMedianName, histogramMinName,
+          histogramNinetyEighthPercentileName, histogramNinetyFifthPercentileName,
+          histogramNinetyNinePointNinePercentileName, histogramNinetyNinethPercentileName,
+          histogramSeventyFifthPercentileName, histogramStdDevName };
+      populateMetricsList(context, metricNames);
+    }
+
+    @Override
+    public void processTimer(MetricName name, Timer timer, Context context) throws Exception {
+      final long currentTimeMillis = System.currentTimeMillis();
+      final Snapshot snapshot = timer.getSnapshot();
+      final String sanitizedName = sanitizeName(name);
+
+      final String timerMinName = sanitizedName + MIN_SUFIX;
+      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMinName, timer.min());
+
+      final String timerMaxName = sanitizedName + MAX_SUFIX;
+      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMaxName, timer.max());
+
+      final String timerMeanName = sanitizedName + MEAN_SUFIX;
+      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean());
+
+      final String timerMedianName = sanitizedName + MEDIAN_SUFIX;
+      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName,
+          snapshot.getMedian());
+
+      final String timerStdDevName = sanitizedName + STD_DEV_SUFIX;
+      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName,
+          timer.stdDev());
+
+      final String timerSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
+      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          timerSeventyFifthPercentileName, snapshot.get75thPercentile());
+
+      final String timerNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
+      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          timerNinetyFifthPercentileName, snapshot.get95thPercentile());
+
+      final String timerNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
+      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          timerNinetyEighthPercentileName, snapshot.get98thPercentile());
+
+      final String timerNinetyNinthPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
+      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          timerNinetyNinthPercentileName, snapshot.get99thPercentile());
+
+      final String timerNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
+      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
+          timerNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
+
+      metricsCache.putTimelineMetric(minMetric);
+      metricsCache.putTimelineMetric(maxMetric);
+      metricsCache.putTimelineMetric(meanMetric);
+      metricsCache.putTimelineMetric(medianMetric);
+      metricsCache.putTimelineMetric(stdDevMetric);
+      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
+      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
+
+      String[] metricNames = new String[] { timerMaxName, timerMeanName, timerMedianName, timerMinName,
+          timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, timerNinetyNinePointNinePercentileName,
+          timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, timerStdDevName };
+      populateMetricsList(context, metricNames);
+    }
+
+    @Override
+    public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {
+      final long currentTimeMillis = System.currentTimeMillis();
+      final String sanitizedName = sanitizeName(name);
+      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, sanitizedName,
+          Double.parseDouble(String.valueOf(gauge.value())));
+      metricsCache.putTimelineMetric(metric);
+      populateMetricsList(context, sanitizedName);
+    }
+
+    private void populateMetricsList(Context context, String... metricNames) {
+      for (String metricName : metricNames) {
+        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);
+        if (cachedMetric != null) {
+          context.getTimelineMetricList().add(cachedMetric);
+        }
+      }
+    }
+
+    protected String sanitizeName(MetricName name) {
+      if (name == null) {
+        return "";
+      }
+      final String qualifiedTypeName = name.getGroup() + "." + name.getType() + "." + name.getName();
+      final String metricName = name.hasScope() ? qualifiedTypeName + '.' + name.getScope() : qualifiedTypeName;
+      final StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < metricName.length(); i++) {
+        final char p = metricName.charAt(i);
+        if (!(p >= 'A' && p <= 'Z') && !(p >= 'a' && p <= 'z') && !(p >= '0' && p <= '9') && (p != '_') && (p != '-')
+            && (p != '.') && (p != '\0')) {
+          sb.append('_');
+        } else {
+          sb.append(p);
+        }
+      }
+      return sb.toString();
+    }
+
+  }
+}

+ 25 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java

@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.kafka;
+
+import kafka.metrics.KafkaMetricsReporterMBean;
+
+public interface KafkaTimelineMetricsReporterMBean extends KafkaMetricsReporterMBean {
+
+}

+ 218 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java

@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.kafka;
+
+import java.io.Closeable;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
+
+/**
+ * The abstract base class for all scheduled reporters (i.e., reporters which
+ * process a registry's metrics periodically).
+ *
+ * @see ConsoleReporter
+ * @see CsvReporter
+ * @see Slf4jReporter
+ */
+public abstract class ScheduledReporter implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class);
+
+  /**
+   * A simple named thread factory.
+   */
+  @SuppressWarnings("NullableProblems")
+  private static class NamedThreadFactory implements ThreadFactory {
+    private final ThreadGroup group;
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String namePrefix;
+
+    private NamedThreadFactory(String name) {
+      final SecurityManager s = System.getSecurityManager();
+      this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+      this.namePrefix = "metrics-" + name + "-thread-";
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+      t.setDaemon(true);
+      if (t.getPriority() != Thread.NORM_PRIORITY) {
+        t.setPriority(Thread.NORM_PRIORITY);
+      }
+      return t;
+    }
+  }
+
+  private static final AtomicInteger FACTORY_ID = new AtomicInteger();
+
+  private final MetricsRegistry registry;
+  private final ScheduledExecutorService executor;
+  private final double durationFactor;
+  private final String durationUnit;
+  private final double rateFactor;
+  private final String rateUnit;
+
+  /**
+   * Creates a new {@link ScheduledReporter} instance.
+   *
+   * @param registry
+   *          the {@link com.codahale.metrics.MetricRegistry} containing the
+   *          metrics this reporter will report
+   * @param name
+   *          the reporter's name
+   * @param rateUnit
+   *          a unit of time
+   * @param durationUnit
+   *          a unit of time
+   */
+  protected ScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
+    this(registry, rateUnit, durationUnit, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-'
+        + FACTORY_ID.incrementAndGet())));
+  }
+
+  /**
+   * Creates a new {@link ScheduledReporter} instance.
+   *
+   * @param registry
+   *          the {@link com.codahale.metrics.MetricRegistry} containing the
+   *          metrics this reporter will report
+   * @param executor
+   *          the executor to use while scheduling reporting of metrics.
+   */
+  protected ScheduledReporter(MetricsRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit,
+      ScheduledExecutorService executor) {
+    this.registry = registry;
+    this.executor = executor;
+    this.rateFactor = rateUnit.toSeconds(1);
+    this.rateUnit = calculateRateUnit(rateUnit);
+    this.durationFactor = 1.0 / durationUnit.toNanos(1);
+    this.durationUnit = durationUnit.toString().toLowerCase(Locale.US);
+  }
+
+  /**
+   * Starts the reporter polling at the given period.
+   *
+   * @param period
+   *          the amount of time between polls
+   * @param unit
+   *          the unit for {@code period}
+   */
+  public void start(long period, TimeUnit unit) {
+    executor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          report();
+        } catch (RuntimeException ex) {
+          LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this
+              .getClass().getSimpleName(), ex);
+        }
+      }
+    }, period, period, unit);
+  }
+
+  /**
+   * Stops the reporter and shuts down its thread of execution.
+   *
+   * Uses the shutdown pattern from
+   * http://docs.oracle.com/javase/7/docs/api/java
+   * /util/concurrent/ExecutorService.html
+   */
+  public void stop() {
+    executor.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+        executor.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+          System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      executor.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Stops the reporter and shuts down its thread of execution.
+   */
+  @Override
+  public void close() {
+    stop();
+  }
+
+  /**
+   * Report the current values of all metrics in the registry.
+   */
+  public void report() {
+    synchronized (this) {
+      report(registry.allMetrics().entrySet());
+    }
+  }
+
+  /**
+   * Called periodically by the polling thread. Subclasses should report all the
+   * given metrics.
+   *
+   * @param metrics
+   *          all the metrics in the registry
+   */
+  public abstract void report(Set<Entry<MetricName, Metric>> metrics);
+
+  protected String getRateUnit() {
+    return rateUnit;
+  }
+
+  protected String getDurationUnit() {
+    return durationUnit;
+  }
+
+  protected double convertDuration(double duration) {
+    return duration * durationFactor;
+  }
+
+  protected double convertRate(double rate) {
+    return rate * rateFactor;
+  }
+
+  private String calculateRateUnit(TimeUnit unit) {
+    final String s = unit.toString().toLowerCase(Locale.US);
+    return s.substring(0, s.length() - 1);
+  }
+}

+ 109 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.kafka;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replay;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import kafka.utils.VerifiableProperties;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ Metrics.class, HttpClient.class })
+@PowerMockIgnore("javax.management.*")
+public class KafkaTimelineMetricsReporterTest {
+
+  private final List<Metric> list = new ArrayList<Metric>();
+  private final MetricsRegistry registry = new MetricsRegistry();
+  @SuppressWarnings("rawtypes")
+  private final Gauge gauge = mock(Gauge.class);
+  private final KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter = new KafkaTimelineMetricsReporter();
+  private VerifiableProperties props;
+
+  @Before
+  public void setUp() throws Exception {
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    Gauge g = registry.newGauge(System.class, "gauge", gauge);
+    Counter counter = registry.newCounter(System.class, "counter");
+    Histogram histogram = registry.newHistogram(System.class, "histogram");
+    Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS);
+    Timer timer = registry.newTimer(System.class, "timer");
+    list.add(g);
+    list.add(counter);
+    list.add(histogram);
+    list.add(meter);
+    list.add(timer);
+    Properties properties = new Properties();
+    properties.setProperty("kafka.timeline.metrics.sendInterval", "5900");
+    properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000");
+    properties.setProperty("kafka.timeline.metrics.host", "localhost");
+    properties.setProperty("kafka.timeline.metrics.port", "8188");
+    properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
+    props = new VerifiableProperties(properties);
+  }
+
+  @Test
+  public void testReporterStartStop() {
+    mockStatic(Metrics.class);
+    EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);
+    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter);
+    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
+    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
+    kafkaTimelineMetricsReporter.setHttpClient(httpClient);
+    replay(Metrics.class, httpClient, timelineMetricsCache);
+    kafkaTimelineMetricsReporter.init(props);
+    kafkaTimelineMetricsReporter.stopReporter();
+    verifyAll();
+  }
+
+  private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) {
+    TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class);
+    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
+    EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1")).andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class));
+    EasyMock.expectLastCall().once();
+    return timelineMetricsCache;
+  }
+}

+ 105 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.kafka;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+
+public class ScheduledReporterTest {
+  private final Gauge gauge = mock(Gauge.class);
+  private final List<Metric> list = new ArrayList<Metric>();
+  private final MetricsRegistry registry = new MetricsRegistry();
+  private final ScheduledReporter reporter = spy(new ScheduledReporter(registry, "example", TimeUnit.SECONDS,
+      TimeUnit.MILLISECONDS) {
+    @Override
+    public void report(Set<Entry<MetricName, Metric>> metrics) {
+      // nothing doing!
+    }
+  });
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() throws Exception {
+    Gauge g = registry.newGauge(System.class, "gauge", gauge);
+    Counter counter = registry.newCounter(System.class, "counter");
+    Histogram histogram = registry.newHistogram(System.class, "histogram");
+    Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS);
+    Timer timer = registry.newTimer(System.class, "timer");
+    list.add(g);
+    list.add(counter);
+    list.add(histogram);
+    list.add(meter);
+    list.add(timer);
+    reporter.start(200, TimeUnit.MILLISECONDS);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    reporter.stop();
+  }
+
+  @Test
+  public void pollsPeriodically() throws Exception {
+    Thread.sleep(500);
+    verify(reporter, times(2)).report(set(list));
+  }
+
+  private Set<Entry<MetricName, Metric>> set(List<Metric> metrics) {
+    final Map<MetricName, Metric> map = new HashMap<MetricName, Metric>();
+    for (Metric metric : metrics) {
+      String name = null;
+      if (metric instanceof Gauge) {
+        name = "gauge";
+      } else if (metric instanceof Counter) {
+        name = "counter";
+      } else if (metric instanceof Histogram) {
+        name = "histogram";
+      } else if (metric instanceof Meter) {
+        name = "meter";
+      } else if (metric instanceof Timer) {
+        name = "timer";
+      }
+      map.put(new MetricName(System.class, name), metric);
+    }
+    return map.entrySet();
+  }
+}

+ 2 - 5
ambari-metrics/pom.xml

@@ -14,11 +14,7 @@
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0                              http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
   <groupId>org.apache.ambari</groupId>
   <modelVersion>4.0.0</modelVersion>
@@ -29,6 +25,7 @@
     <module>ambari-metrics-common</module>
     <module>ambari-metrics-hadoop-sink</module>
     <module>ambari-metrics-flume-sink</module>
+    <module>ambari-metrics-kafka-sink</module>
     <module>ambari-metrics-storm-sink</module>
     <module>ambari-metrics-timelineservice</module>
     <module>ambari-metrics-host-monitoring</module>

+ 28 - 3
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml

@@ -101,7 +101,7 @@
     <name>num.partitions</name>
     <value>1</value>
     <description>
-       	The default number of partitions per topic.
+        The default number of partitions per topic.
     </description>
   </property>
   <property>
@@ -291,9 +291,9 @@
   </property>
   <property>
     <name>kafka.metrics.reporters</name>
-    <value>kafka.ganglia.KafkaGangliaMetricsReporter</value>
+    <value>{{kafka_metrics_reporters}}</value>
     <description>
-      kafka ganglia metrics reporter
+      kafka ganglia metrics reporter and kafka timeline metrics reporter
     </description>
   </property>
   <property>
@@ -318,4 +318,29 @@
     <value>kafka</value>
     <description>Ganglia group name </description>
   </property>
+  <property>
+    <name>kafka.timeline.metrics.reporter.enabled</name>
+    <value>true</value>
+    <description>Kafka timeline metrics reporter enable</description>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.host</name>
+    <value>{{metric_collector_host}}</value>
+    <description>Timeline host</description>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.port</name>
+    <value>{{metric_collector_port}}</value>
+    <description>Timeline port</description>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.reporter.sendInterval</name>
+    <value>5900</value>
+    <description>Timeline metrics reporter send interval</description>
+  </property>
+    <property>
+    <name>kafka.timeline.metrics.maxRowCacheSize</name>
+    <value>10000</value>
+    <description>Timeline metrics reporter send interval</description>
+  </property>
 </configuration>

+ 6 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml

@@ -50,6 +50,12 @@
 # The java implementation to use.
 export JAVA_HOME={{java64_home}}
 export PATH=$PATH:$JAVA_HOME/bin
+
+# Add kafka sink to classpath and related depenencies
+if [ -e "/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" ]; then
+  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar
+  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*
+fi
     </value>
   </property>
 </configuration>

+ 5 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py

@@ -35,6 +35,11 @@ def kafka():
     kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
     kafka_server_config['broker.id'] = brokerid
     kafka_server_config['host.name'] = params.hostname
+    kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters
+    if(params.has_metric_collector):
+            kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
+            kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
+
     kafka_data_dir = kafka_server_config['log.dirs']
     Directory(filter(None,kafka_data_dir.split(",")),
               owner=params.kafka_user,

+ 29 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py

@@ -60,3 +60,32 @@ if (('kafka-log4j' in config['configurations']) and ('content' in config['config
     log4j_props = config['configurations']['kafka-log4j']['content']
 else:
     log4j_props = None
+
+if 'ganglia_server_host' in config['clusterHostInfo'] and \
+    len(config['clusterHostInfo']['ganglia_server_host'])>0:
+  ganglia_installed = True
+  ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
+  ganglia_report_interval = 60
+else:
+  ganglia_installed = False
+
+kafka_metrics_reporters=""
+metric_collector_host = ""
+metric_collector_port = ""
+
+if ganglia_installed:
+  kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter"
+
+ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
+has_metric_collector = not len(ams_collector_hosts) == 0
+
+if has_metric_collector:
+  metric_collector_host = ams_collector_hosts[0]
+  metric_collector_port = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:8188")
+  if metric_collector_port and metric_collector_port.find(':') != -1:
+    metric_collector_port = metric_collector_port.split(':')[1]
+
+  if not len(kafka_metrics_reporters) == 0:
+      kafka_metrics_reporters = kafka_metrics_reporters + ','
+
+  kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"