Ver Fonte

Revert "AMBARI-9185. Add Kafka metric sink implementation to enable sink to AMS. Build failure."

This reverts commit 8d27ac2b77abdeeb61fd8d9bc6b51ce113fcd900.
Siddharth Wagle há 10 anos atrás
pai
commit
f1d354ccae
17 ficheiros alterados com 12 adições e 1259 exclusões
  1. 1 46
      ambari-metrics/ambari-metrics-assembly/pom.xml
  2. 0 8
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
  3. 0 8
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
  4. 3 8
      ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh
  5. 0 163
      ambari-metrics/ambari-metrics-kafka-sink/pom.xml
  6. 0 21
      ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml
  7. 0 34
      ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml
  8. 0 448
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
  9. 0 25
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java
  10. 0 218
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java
  11. 0 109
      ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
  12. 0 105
      ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java
  13. 5 2
      ambari-metrics/pom.xml
  14. 3 28
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml
  15. 0 6
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml
  16. 0 3
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py
  17. 0 27
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py

+ 1 - 46
ambari-metrics/ambari-metrics-assembly/pom.xml

@@ -36,7 +36,6 @@
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
     <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
-    <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
     <python.ver>python &gt;= 2.6</python.ver>
     <deb.python.ver>python (&gt;= 2.6)</deb.python.ver>
     <deb.architecture>amd64</deb.architecture>
@@ -44,7 +43,6 @@
     <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
     <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
     <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
-    <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
   </properties>
 
   <build>
@@ -449,6 +447,7 @@
                           <location>${hadoop-sink.dir}/target/ambari-metrics-hadoop-sink-with-common-${project.version}.jar</location>
                         </source>
                       </sources>
+
                     </mapping>
                     <mapping>
                       <directory>/usr/lib/flume/lib</directory>
@@ -466,22 +465,6 @@
                         </source>
                       </sources>
                     </mapping>
-                    <mapping>
-                      <directory>/usr/lib/ambari-metrics-kafka-sink</directory>
-                      <sources>
-                        <source>
-                          <location>${kafka-sink.dir}/target/${kafka.sink.jar}</location>
-                        </source>
-                      </sources>
-                    </mapping>
-                     <mapping>
-                      <directory>/usr/lib/ambari-metrics-kafka-sink/lib</directory>
-                      <sources>
-                        <source>
-                          <location>${kafka-sink.dir}/target/lib</location>
-                        </source>
-                      </sources>
-                    </mapping>
                   </mappings>
                 </configuration>
 
@@ -612,7 +595,6 @@
                     <path>/var/log/ambari-metrics-collector</path>
                     <path>/var/lib/ambari-metrics-collector</path>
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
-                    <path>/usr/lib/ambari-metrics-kafka-sink</path>
                     <path>/usr/lib/flume/lib</path>
                     <path>/usr/lib/storm/lib</path>
                   </paths>
@@ -789,28 +771,6 @@
                   </mapper>
                 </data>
 
-                <!-- kafka sink -->
-
-                <data>
-                  <src>${kafka-sink.dir}/target/${kafka.sink.jar}</src>
-                  <type>file</type>
-                  <mapper>
-                    <type>perm</type>
-                    <filemode>644</filemode>
-                    <dirmode>755</dirmode>
-                    <prefix>/usr/lib/ambari-metrics-kafka-sink</prefix>
-                  </mapper>
-                </data>
-                <data>
-                  <src>${kafka-sink.dir}/target/lib</src>
-                  <type>file</type>
-                  <mapper>
-                    <type>perm</type>
-                    <filemode>644</filemode>
-                    <dirmode>755</dirmode>
-                    <prefix>/usr/lib/ambari-metrics-kafka-sink/lib</prefix>
-                  </mapper>
-                </data>
               </dataSet>
             </configuration>
           </plugin>
@@ -1101,11 +1061,6 @@
       <artifactId>ambari-metrics-storm-sink</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.ambari</groupId>
-      <artifactId>ambari-metrics-kafka-sink</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <dependency>
       <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-host-monitoring</artifactId>

+ 0 - 8
ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml

@@ -37,10 +37,6 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
-    <fileSet>
-      <directory>${kafka-sink.dir}/target/lib</directory>
-      <outputDirectory>hadoop-sink/lib</outputDirectory>
-    </fileSet>
   </fileSets>
 
   <files>
@@ -56,10 +52,6 @@
       <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
-    <file>
-      <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
-      <outputDirectory>hadoop-sink</outputDirectory>
-    </file>
     <file>
       <source>${basedir}/src/main/package/msi/sink.wxs</source>
       <outputDirectory>../../</outputDirectory>

+ 0 - 8
ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml

@@ -38,10 +38,6 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
-    <fileSet>
-      <directory>${kafka-sink.dir}/target/lib</directory>
-      <outputDirectory>hadoop-sink/lib</outputDirectory>
-    </fileSet>
   </fileSets>
 
   <files>
@@ -57,10 +53,6 @@
       <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
-    <file>
-      <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
-      <outputDirectory>hadoop-sink</outputDirectory>
-    </file>
   </files>
 
 

+ 3 - 8
ambari-metrics/ambari-metrics-assembly/src/main/package/rpm/sink/postinstall.sh

@@ -15,20 +15,15 @@
 # limitations under the License
 
 HADOOP_LINK_NAME="/usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar"
-HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
-
 FLUME_LINK_NAME="/usr/lib/flume/lib/ambari-metrics-flume-sink.jar"
+HADOOP_SINK_JAR="/usr/lib/ambari-metrics-hadoop-sink/${hadoop.sink.jar}"
 FLUME_SINK_JAR="/usr/lib/flume/lib/${flume.sink.jar}"
-
-KAFKA_LINK_NAME="/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar"
-KAFKA_SINK_JAR="/usr/lib/ambari-metrics-kafka-sink/${kafka.sink.jar}"
-
 #link for storm jar not required with current loading
 #STORM_SINK_JAR="/usr/lib/storm/lib/${storm.sink.jar}"
 #STORM_LINK_NAME="/usr/lib/storm/lib/ambari-metrics-storm-sink.jar"
 
-JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR} ${KAFKA_SINK_JAR})
-LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME} ${KAFKA_LINK_NAME})
+JARS=(${HADOOP_SINK_JAR} ${FLUME_SINK_JAR})
+LINKS=(${HADOOP_LINK_NAME} ${FLUME_LINK_NAME})
 
 for index in ${!LINKS[*]}
 do

+ 0 - 163
ambari-metrics/ambari-metrics-kafka-sink/pom.xml

@@ -1,163 +0,0 @@
-<?<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<parent>
-		<artifactId>ambari-metrics</artifactId>
-		<groupId>org.apache.ambari</groupId>
-		<version>0.1.0-SNAPSHOT</version>
-	</parent>
-	<modelVersion>4.0.0</modelVersion>
-	<artifactId>ambari-metrics-kafka-sink</artifactId>
-	<version>0.1.0-SNAPSHOT</version>
-	<packaging>jar</packaging>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<version>2.9</version>
-				<executions>
-					<execution>
-						<id>copy-dependencies</id>
-						<phase>package</phase>
-						<goals>
-							<goal>copy-dependencies</goal>
-						</goals>
-						<configuration>
-							<includeArtifactIds>commons-codec,commons-collections,commons-httpclient,commons-lang,commons-logging,guava,jackson-core-asl,jackson-mapper-asl,jackson-xc,hadoop-common</includeArtifactIds>
-							<outputDirectory>${project.build.directory}/lib</outputDirectory>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<artifactId>maven-assembly-plugin</artifactId>
-				<executions>
-					<execution>
-						<configuration>
-							<descriptors>
-								<descriptor>src/main/assemblies/jar-with-common.xml</descriptor>
-							</descriptors>
-							<attach>false</attach>
-							<tarLongFileMode>gnu</tarLongFileMode>
-							<appendAssemblyId>false</appendAssemblyId>
-							<finalName>${project.artifactId}-with-common-${project.version}</finalName>
-						</configuration>
-						<id>build-jar</id>
-						<phase>package</phase>
-						<goals>
-							<goal>single</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.0</version>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.8</version>
-				<executions>
-					<execution>
-						<id>parse-version</id>
-						<phase>validate</phase>
-						<goals>
-							<goal>parse-version</goal>
-						</goals>
-					</execution>
-					<execution>
-						<id>regex-property</id>
-						<goals>
-							<goal>regex-property</goal>
-						</goals>
-						<configuration>
-							<name>ambariVersion</name>
-							<value>${project.version}</value>
-							<regex>^([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
-							<replacement>$1.$2.$3</replacement>
-							<failIfNoMatch>false</failIfNoMatch>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<groupId>com.github.goldin</groupId>
-				<artifactId>copy-maven-plugin</artifactId>
-				<version>0.2.5</version>
-				<executions>
-					<execution>
-						<id>create-archive</id>
-						<phase>none</phase>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.ambari</groupId>
-			<artifactId>ambari-metrics-common</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_2.10</artifactId>
-			<version>0.8.1.1</version>
-		</dependency>
-		<dependency>
-			<groupId>com.yammer.metrics</groupId>
-			<artifactId>metrics-core</artifactId>
-			<version>2.2.0</version>
-		</dependency>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<scope>test</scope>
-			<version>4.10</version>
-		</dependency>
-		<dependency>
-			<groupId>org.easymock</groupId>
-			<artifactId>easymock</artifactId>
-			<version>3.2</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.powermock</groupId>
-			<artifactId>powermock-api-easymock</artifactId>
-			<version>1.4.9</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.powermock</groupId>
-			<artifactId>powermock-module-junit4</artifactId>
-			<version>1.4.9</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.mockito</groupId>
-			<artifactId>mockito-all</artifactId>
-			<version>1.9.5</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>

+ 0 - 21
ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/empty.xml

@@ -1,21 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<assembly>
-    <id>empty</id>
-    <formats/>
-</assembly>

+ 0 - 34
ambari-metrics/ambari-metrics-kafka-sink/src/main/assemblies/jar-with-common.xml

@@ -1,34 +0,0 @@
-<?xml version="1.0"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<assembly>
-  <id>jar-with-common</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <dependencySets>
-    <dependencySet>
-      <outputDirectory>/</outputDirectory>
-      <unpack>true</unpack>
-      <includes>
-        <include>org.apache.ambari:ambari-metrics-common</include>
-        <include>org.apache.ambari:ambari-metrics-kafka-sink</include>
-      </includes>
-    </dependencySet>
-  </dependencySets>
-</assembly>

+ 0 - 448
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -1,448 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import kafka.metrics.KafkaMetricsConfig;
-import kafka.metrics.KafkaMetricsReporter;
-import kafka.utils.VerifiableProperties;
-
-import org.apache.commons.lang.ClassUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.util.Servers;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Metered;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricProcessor;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-import com.yammer.metrics.stats.Snapshot;
-
-public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink implements KafkaMetricsReporter,
-    KafkaTimelineMetricsReporterMBean {
-
-  private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class);
-
-  private static final String TIMELINE_METRICS_SEND_INTERVAL_PROPERTY = "kafka.timeline.metrics.sendInterval";
-  private static final String TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY = "kafka.timeline.metrics.maxRowCacheSize";
-  private static final String TIMELINE_HOST_PROPERTY = "kafka.timeline.metrics.host";
-  private static final String TIMELINE_PORT_PROPERTY = "kafka.timeline.metrics.port";
-  private static final String TIMELINE_REPORTER_ENABLED_PROPERTY = "kafka.timeline.metrics.reporter.enabled";
-  private static final String TIMELINE_DEFAULT_HOST = "localhost";
-  private static final String TIMELINE_DEFAULT_PORT = "8188";
-
-  private boolean initialized = false;
-  private boolean running = false;
-  private Object lock = new Object();
-  private String collectorUri;
-  private String hostname;
-  private SocketAddress socketAddress;
-  private TimelineScheduledReporter reporter;
-  private TimelineMetricsCache metricsCache;
-
-  @Override
-  protected SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
-  @Override
-  protected String getCollectorUri() {
-    return collectorUri;
-  }
-
-  public void setMetricsCache(TimelineMetricsCache metricsCache) {
-    this.metricsCache = metricsCache;
-  }
-
-  public void init(VerifiableProperties props) {
-    synchronized (lock) {
-      if (!initialized) {
-        LOG.info("Initializing Kafka Timeline Metrics Sink");
-        try {
-          hostname = InetAddress.getLocalHost().getHostName();
-        } catch (UnknownHostException e) {
-          LOG.error("Could not identify hostname.");
-          throw new RuntimeException("Could not identify hostname.", e);
-        }
-        KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
-        int metricsSendInterval = Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
-        int maxRowCacheSize = Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
-        String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
-        String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
-        setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
-        collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
-        List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost,
-            Integer.parseInt(metricCollectorPort));
-        if (socketAddresses != null && !socketAddresses.isEmpty()) {
-          socketAddress = socketAddresses.get(0);
-        }
-        initializeReporter();
-        if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
-          startReporter(metricsConfig.pollingIntervalSecs());
-        }
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("CollectorUri = " + collectorUri);
-          LOG.trace("SocketAddress = " + socketAddress);
-          LOG.trace("MetricsSendInterval = " + metricsSendInterval);
-          LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
-        }
-      }
-    }
-  }
-
-  public String getMBeanName() {
-    return "kafka:type=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter";
-  }
-
-  public synchronized void startReporter(long period) {
-    synchronized (lock) {
-      if (initialized && !running) {
-        reporter.start(period, TimeUnit.SECONDS);
-        running = true;
-        LOG.info(String.format("Started Kafka Timeline metrics reporter with polling period %d seconds", period));
-      }
-    }
-  }
-
-  public synchronized void stopReporter() {
-    synchronized (lock) {
-      if (initialized && running) {
-        reporter.stop();
-        running = false;
-        LOG.info("Stopped Kafka Timeline metrics reporter");
-        initializeReporter();
-      }
-    }
-  }
-
-  private void initializeReporter() {
-    reporter = new TimelineScheduledReporter(Metrics.defaultRegistry(), "timeline-scheduled-reporter",
-        TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
-    initialized = true;
-  }
-
-  interface Context {
-    public List<TimelineMetric> getTimelineMetricList();
-  }
-
-  class TimelineScheduledReporter extends ScheduledReporter implements MetricProcessor<Context> {
-
-    private static final String APP_ID = "kafka_broker";
-    private static final String COUNT_SUFIX = ".count";
-    private static final String ONE_MINUTE_RATE_SUFIX = ".1MinuteRate";
-    private static final String MEAN_SUFIX = ".mean";
-    private static final String MEAN_RATE_SUFIX = ".meanRate";
-    private static final String FIVE_MINUTE_RATE_SUFIX = ".5MinuteRate";
-    private static final String FIFTEEN_MINUTE_RATE_SUFIX = ".15MinuteRate";
-    private static final String MIN_SUFIX = ".min";
-    private static final String MAX_SUFIX = ".max";
-    private static final String MEDIAN_SUFIX = ".median";
-    private static final String STD_DEV_SUFIX = "stddev";
-    private static final String SEVENTY_FIFTH_PERCENTILE_SUFIX = ".75percentile";
-    private static final String NINETY_FIFTH_PERCENTILE_SUFIX = ".95percentile";
-    private static final String NINETY_EIGHTH_PERCENTILE_SUFIX = ".98percentile";
-    private static final String NINETY_NINTH_PERCENTILE_SUFIX = ".99percentile";
-    private static final String NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX = ".999percentile";
-
-    protected TimelineScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
-      super(registry, name, rateUnit, durationUnit);
-    }
-
-    @Override
-    public void report(Set<Entry<MetricName, Metric>> metrics) {
-      final List<TimelineMetric> metricsList = new ArrayList<TimelineMetric>();
-      try {
-        for (Entry<MetricName, Metric> entry : metrics) {
-          final MetricName metricName = entry.getKey();
-          final Metric metric = entry.getValue();
-          Context context = new Context() {
-
-            public List<TimelineMetric> getTimelineMetricList() {
-              return metricsList;
-            }
-
-          };
-          metric.processWith(this, metricName, context);
-        }
-      } catch (Throwable t) {
-        LOG.error("Exception processing Kafka metric", t);
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Metrics List size: " + metricsList.size());
-        LOG.trace("Metics Set size: " + metrics.size());
-      }
-      if (!metricsList.isEmpty()) {
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        timelineMetrics.setMetrics(metricsList);
-        try {
-          emitMetrics(timelineMetrics);
-        } catch (IOException e) {
-          LOG.error("Unexpected error", e);
-        } catch (Throwable t) {
-          LOG.error("Exception emitting metrics", t);
-        }
-      }
-    }
-
-    private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName,
-        Number attributeValue) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Creating timeline metric: " + attributeName + " = " + attributeValue + " time = "
-            + currentTimeMillis + " app_id = " + component);
-      }
-      TimelineMetric timelineMetric = new TimelineMetric();
-      timelineMetric.setMetricName(attributeName);
-      timelineMetric.setHostName(hostname);
-      timelineMetric.setAppId(component);
-      timelineMetric.setStartTime(currentTimeMillis);
-      timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
-      timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
-      return timelineMetric;
-    }
-
-    @Override
-    public void processMeter(MetricName name, Metered meter, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final String sanitizedName = sanitizeName(name);
-      final String meterCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric countMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count());
-
-      final String meterOneMinuteRateName = sanitizedName + ONE_MINUTE_RATE_SUFIX;
-      final TimelineMetric oneMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterOneMinuteRateName, meter.oneMinuteRate());
-
-      final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName,
-          meter.meanRate());
-
-      final String meterFiveMinuteRateName = sanitizedName + FIVE_MINUTE_RATE_SUFIX;
-      final TimelineMetric fiveMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFiveMinuteRateName, meter.fiveMinuteRate());
-
-      final String meterFifteenMinuteRateName = sanitizedName + FIFTEEN_MINUTE_RATE_SUFIX;
-      final TimelineMetric fifteenMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFifteenMinuteRateName, meter.fifteenMinuteRate());
-
-      metricsCache.putTimelineMetric(countMetric);
-      metricsCache.putTimelineMetric(oneMinuteRateMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(fiveMinuteRateMetric);
-      metricsCache.putTimelineMetric(fifteenMinuteRateMetric);
-
-      String[] metricNames = new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName,
-          meterFiveMinuteRateName, meterFifteenMinuteRateName };
-      populateMetricsList(context, metricNames);
-    }
-
-    @Override
-    public void processCounter(MetricName name, Counter counter, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final String sanitizedName = sanitizeName(name);
-      final String metricCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricCountName, counter.count());
-      metricsCache.putTimelineMetric(metric);
-      populateMetricsList(context, metricCountName);
-    }
-
-    @Override
-    public void processHistogram(MetricName name, Histogram histogram, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final Snapshot snapshot = histogram.getSnapshot();
-      final String sanitizedName = sanitizeName(name);
-
-      final String histogramMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMinName,
-          histogram.min());
-
-      final String histogramMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMaxName,
-          histogram.max());
-
-      final String histogramMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName,
-          histogram.mean());
-
-      final String histogramMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName,
-          snapshot.getMedian());
-
-      final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName,
-          histogram.stdDev());
-
-      final String histogramSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String histogramNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String histogramNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String histogramNinetyNinethPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinethPercentileName, snapshot.get99thPercentile());
-
-      final String histogramNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { histogramMaxName, histogramMeanName, histogramMedianName, histogramMinName,
-          histogramNinetyEighthPercentileName, histogramNinetyFifthPercentileName,
-          histogramNinetyNinePointNinePercentileName, histogramNinetyNinethPercentileName,
-          histogramSeventyFifthPercentileName, histogramStdDevName };
-      populateMetricsList(context, metricNames);
-    }
-
-    @Override
-    public void processTimer(MetricName name, Timer timer, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final Snapshot snapshot = timer.getSnapshot();
-      final String sanitizedName = sanitizeName(name);
-
-      final String timerMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMinName, timer.min());
-
-      final String timerMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMaxName, timer.max());
-
-      final String timerMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean());
-
-      final String timerMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName,
-          snapshot.getMedian());
-
-      final String timerStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName,
-          timer.stdDev());
-
-      final String timerSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String timerNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String timerNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String timerNinetyNinthPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinthPercentileName, snapshot.get99thPercentile());
-
-      final String timerNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { timerMaxName, timerMeanName, timerMedianName, timerMinName,
-          timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, timerNinetyNinePointNinePercentileName,
-          timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, timerStdDevName };
-      populateMetricsList(context, metricNames);
-    }
-
-    @Override
-    public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {
-      final long currentTimeMillis = System.currentTimeMillis();
-      final String sanitizedName = sanitizeName(name);
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, sanitizedName,
-          Double.parseDouble(String.valueOf(gauge.value())));
-      metricsCache.putTimelineMetric(metric);
-      populateMetricsList(context, sanitizedName);
-    }
-
-    private void populateMetricsList(Context context, String... metricNames) {
-      for (String metricName : metricNames) {
-        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);
-        if (cachedMetric != null) {
-          context.getTimelineMetricList().add(cachedMetric);
-        }
-      }
-    }
-
-    protected String sanitizeName(MetricName name) {
-      if (name == null) {
-        return "";
-      }
-      final String qualifiedTypeName = name.getGroup() + "." + name.getType() + "." + name.getName();
-      final String metricName = name.hasScope() ? qualifiedTypeName + '.' + name.getScope() : qualifiedTypeName;
-      final StringBuilder sb = new StringBuilder();
-      for (int i = 0; i < metricName.length(); i++) {
-        final char p = metricName.charAt(i);
-        if (!(p >= 'A' && p <= 'Z') && !(p >= 'a' && p <= 'z') && !(p >= '0' && p <= '9') && (p != '_') && (p != '-')
-            && (p != '.') && (p != '\0')) {
-          sb.append('_');
-        } else {
-          sb.append(p);
-        }
-      }
-      return sb.toString();
-    }
-
-  }
-}

+ 0 - 25
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterMBean.java

@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import kafka.metrics.KafkaMetricsReporterMBean;
-
-public interface KafkaTimelineMetricsReporterMBean extends KafkaMetricsReporterMBean {
-
-}

+ 0 - 218
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporter.java

@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import java.io.Closeable;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.reporting.ConsoleReporter;
-import com.yammer.metrics.reporting.CsvReporter;
-
-/**
- * The abstract base class for all scheduled reporters (i.e., reporters which
- * process a registry's metrics periodically).
- *
- * @see ConsoleReporter
- * @see CsvReporter
- * @see Slf4jReporter
- */
-public abstract class ScheduledReporter implements Closeable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class);
-
-  /**
-   * A simple named thread factory.
-   */
-  @SuppressWarnings("NullableProblems")
-  private static class NamedThreadFactory implements ThreadFactory {
-    private final ThreadGroup group;
-    private final AtomicInteger threadNumber = new AtomicInteger(1);
-    private final String namePrefix;
-
-    private NamedThreadFactory(String name) {
-      final SecurityManager s = System.getSecurityManager();
-      this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
-      this.namePrefix = "metrics-" + name + "-thread-";
-    }
-
-    @Override
-    public Thread newThread(Runnable r) {
-      final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
-      t.setDaemon(true);
-      if (t.getPriority() != Thread.NORM_PRIORITY) {
-        t.setPriority(Thread.NORM_PRIORITY);
-      }
-      return t;
-    }
-  }
-
-  private static final AtomicInteger FACTORY_ID = new AtomicInteger();
-
-  private final MetricsRegistry registry;
-  private final ScheduledExecutorService executor;
-  private final double durationFactor;
-  private final String durationUnit;
-  private final double rateFactor;
-  private final String rateUnit;
-
-  /**
-   * Creates a new {@link ScheduledReporter} instance.
-   *
-   * @param registry
-   *          the {@link com.codahale.metrics.MetricRegistry} containing the
-   *          metrics this reporter will report
-   * @param name
-   *          the reporter's name
-   * @param rateUnit
-   *          a unit of time
-   * @param durationUnit
-   *          a unit of time
-   */
-  protected ScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
-    this(registry, rateUnit, durationUnit, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-'
-        + FACTORY_ID.incrementAndGet())));
-  }
-
-  /**
-   * Creates a new {@link ScheduledReporter} instance.
-   *
-   * @param registry
-   *          the {@link com.codahale.metrics.MetricRegistry} containing the
-   *          metrics this reporter will report
-   * @param executor
-   *          the executor to use while scheduling reporting of metrics.
-   */
-  protected ScheduledReporter(MetricsRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit,
-      ScheduledExecutorService executor) {
-    this.registry = registry;
-    this.executor = executor;
-    this.rateFactor = rateUnit.toSeconds(1);
-    this.rateUnit = calculateRateUnit(rateUnit);
-    this.durationFactor = 1.0 / durationUnit.toNanos(1);
-    this.durationUnit = durationUnit.toString().toLowerCase(Locale.US);
-  }
-
-  /**
-   * Starts the reporter polling at the given period.
-   *
-   * @param period
-   *          the amount of time between polls
-   * @param unit
-   *          the unit for {@code period}
-   */
-  public void start(long period, TimeUnit unit) {
-    executor.scheduleAtFixedRate(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          report();
-        } catch (RuntimeException ex) {
-          LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this
-              .getClass().getSimpleName(), ex);
-        }
-      }
-    }, period, period, unit);
-  }
-
-  /**
-   * Stops the reporter and shuts down its thread of execution.
-   *
-   * Uses the shutdown pattern from
-   * http://docs.oracle.com/javase/7/docs/api/java
-   * /util/concurrent/ExecutorService.html
-   */
-  public void stop() {
-    executor.shutdown(); // Disable new tasks from being submitted
-    try {
-      // Wait a while for existing tasks to terminate
-      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-        executor.shutdownNow(); // Cancel currently executing tasks
-        // Wait a while for tasks to respond to being cancelled
-        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
-          System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
-        }
-      }
-    } catch (InterruptedException ie) {
-      // (Re-)Cancel if current thread also interrupted
-      executor.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  /**
-   * Stops the reporter and shuts down its thread of execution.
-   */
-  @Override
-  public void close() {
-    stop();
-  }
-
-  /**
-   * Report the current values of all metrics in the registry.
-   */
-  public void report() {
-    synchronized (this) {
-      report(registry.allMetrics().entrySet());
-    }
-  }
-
-  /**
-   * Called periodically by the polling thread. Subclasses should report all the
-   * given metrics.
-   *
-   * @param metrics
-   *          all the metrics in the registry
-   */
-  public abstract void report(Set<Entry<MetricName, Metric>> metrics);
-
-  protected String getRateUnit() {
-    return rateUnit;
-  }
-
-  protected String getDurationUnit() {
-    return durationUnit;
-  }
-
-  protected double convertDuration(double duration) {
-    return duration * durationFactor;
-  }
-
-  protected double convertRate(double rate) {
-    return rate * rateFactor;
-  }
-
-  private String calculateRateUnit(TimeUnit unit) {
-    final String s = unit.toString().toLowerCase(Locale.US);
-    return s.substring(0, s.length() - 1);
-  }
-}

+ 0 - 109
ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java

@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import static org.mockito.Mockito.mock;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replay;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import kafka.utils.VerifiableProperties;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ Metrics.class, HttpClient.class })
-@PowerMockIgnore("javax.management.*")
-public class KafkaTimelineMetricsReporterTest {
-
-  private final List<Metric> list = new ArrayList<Metric>();
-  private final MetricsRegistry registry = new MetricsRegistry();
-  @SuppressWarnings("rawtypes")
-  private final Gauge gauge = mock(Gauge.class);
-  private final KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter = new KafkaTimelineMetricsReporter();
-  private VerifiableProperties props;
-
-  @Before
-  public void setUp() throws Exception {
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    Gauge g = registry.newGauge(System.class, "gauge", gauge);
-    Counter counter = registry.newCounter(System.class, "counter");
-    Histogram histogram = registry.newHistogram(System.class, "histogram");
-    Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS);
-    Timer timer = registry.newTimer(System.class, "timer");
-    list.add(g);
-    list.add(counter);
-    list.add(histogram);
-    list.add(meter);
-    list.add(timer);
-    Properties properties = new Properties();
-    properties.setProperty("kafka.timeline.metrics.sendInterval", "5900");
-    properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000");
-    properties.setProperty("kafka.timeline.metrics.host", "localhost");
-    properties.setProperty("kafka.timeline.metrics.port", "8188");
-    properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
-    props = new VerifiableProperties(properties);
-  }
-
-  @Test
-  public void testReporterStartStop() {
-    mockStatic(Metrics.class);
-    EasyMock.expect(Metrics.defaultRegistry()).andReturn(registry).times(2);
-    TimelineMetricsCache timelineMetricsCache = getTimelineMetricsCache(kafkaTimelineMetricsReporter);
-    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
-    HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
-    kafkaTimelineMetricsReporter.setHttpClient(httpClient);
-    replay(Metrics.class, httpClient, timelineMetricsCache);
-    kafkaTimelineMetricsReporter.init(props);
-    kafkaTimelineMetricsReporter.stopReporter();
-    verifyAll();
-  }
-
-  private TimelineMetricsCache getTimelineMetricsCache(KafkaTimelineMetricsReporter kafkaTimelineMetricsReporter) {
-    TimelineMetricsCache timelineMetricsCache = EasyMock.createNiceMock(TimelineMetricsCache.class);
-    kafkaTimelineMetricsReporter.setMetricsCache(timelineMetricsCache);
-    EasyMock.expect(timelineMetricsCache.getTimelineMetric("key1")).andReturn(new TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(EasyMock.anyObject(TimelineMetric.class));
-    EasyMock.expectLastCall().once();
-    return timelineMetricsCache;
-  }
-}

+ 0 - 105
ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/ScheduledReporterTest.java

@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.kafka;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Timer;
-
-public class ScheduledReporterTest {
-  private final Gauge gauge = mock(Gauge.class);
-  private final List<Metric> list = new ArrayList<Metric>();
-  private final MetricsRegistry registry = new MetricsRegistry();
-  private final ScheduledReporter reporter = spy(new ScheduledReporter(registry, "example", TimeUnit.SECONDS,
-      TimeUnit.MILLISECONDS) {
-    @Override
-    public void report(Set<Entry<MetricName, Metric>> metrics) {
-      // nothing doing!
-    }
-  });
-
-  @SuppressWarnings("unchecked")
-  @Before
-  public void setUp() throws Exception {
-    Gauge g = registry.newGauge(System.class, "gauge", gauge);
-    Counter counter = registry.newCounter(System.class, "counter");
-    Histogram histogram = registry.newHistogram(System.class, "histogram");
-    Meter meter = registry.newMeter(System.class, "meter", "empty", TimeUnit.MILLISECONDS);
-    Timer timer = registry.newTimer(System.class, "timer");
-    list.add(g);
-    list.add(counter);
-    list.add(histogram);
-    list.add(meter);
-    list.add(timer);
-    reporter.start(200, TimeUnit.MILLISECONDS);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    reporter.stop();
-  }
-
-  @Test
-  public void pollsPeriodically() throws Exception {
-    Thread.sleep(500);
-    verify(reporter, times(2)).report(set(list));
-  }
-
-  private Set<Entry<MetricName, Metric>> set(List<Metric> metrics) {
-    final Map<MetricName, Metric> map = new HashMap<MetricName, Metric>();
-    for (Metric metric : metrics) {
-      String name = null;
-      if (metric instanceof Gauge) {
-        name = "gauge";
-      } else if (metric instanceof Counter) {
-        name = "counter";
-      } else if (metric instanceof Histogram) {
-        name = "histogram";
-      } else if (metric instanceof Meter) {
-        name = "meter";
-      } else if (metric instanceof Timer) {
-        name = "timer";
-      }
-      map.put(new MetricName(System.class, name), metric);
-    }
-    return map.entrySet();
-  }
-}

+ 5 - 2
ambari-metrics/pom.xml

@@ -14,7 +14,11 @@
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
---><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0                              http://maven.apache.org/xsd/maven-4.0.0.xsd">
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
 
   <groupId>org.apache.ambari</groupId>
   <modelVersion>4.0.0</modelVersion>
@@ -25,7 +29,6 @@
     <module>ambari-metrics-common</module>
     <module>ambari-metrics-hadoop-sink</module>
     <module>ambari-metrics-flume-sink</module>
-    <module>ambari-metrics-kafka-sink</module>
     <module>ambari-metrics-storm-sink</module>
     <module>ambari-metrics-timelineservice</module>
     <module>ambari-metrics-host-monitoring</module>

+ 3 - 28
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-broker.xml

@@ -101,7 +101,7 @@
     <name>num.partitions</name>
     <value>1</value>
     <description>
-        The default number of partitions per topic.
+       	The default number of partitions per topic.
     </description>
   </property>
   <property>
@@ -291,9 +291,9 @@
   </property>
   <property>
     <name>kafka.metrics.reporters</name>
-    <value>{{kafka_metrics_reporters}}</value>
+    <value>kafka.ganglia.KafkaGangliaMetricsReporter</value>
     <description>
-      kafka ganglia metrics reporter and kafka timeline metrics reporter
+      kafka ganglia metrics reporter
     </description>
   </property>
   <property>
@@ -318,29 +318,4 @@
     <value>kafka</value>
     <description>Ganglia group name </description>
   </property>
-  <property>
-    <name>kafka.timeline.metrics.reporter.enabled</name>
-    <value>true</value>
-    <description>Kafka timeline metrics reporter enable</description>
-  </property>
-  <property>
-    <name>kafka.timeline.metrics.host</name>
-    <value>{{metric_collector_host}}</value>
-    <description>Timeline host</description>
-  </property>
-  <property>
-    <name>kafka.timeline.metrics.port</name>
-    <value>{{metric_collector_port}}</value>
-    <description>Timeline port</description>
-  </property>
-  <property>
-    <name>kafka.timeline.metrics.reporter.sendInterval</name>
-    <value>5900</value>
-    <description>Timeline metrics reporter send interval</description>
-  </property>
-    <property>
-    <name>kafka.timeline.metrics.maxRowCacheSize</name>
-    <value>10000</value>
-    <description>Timeline metrics reporter send interval</description>
-  </property>
 </configuration>

+ 0 - 6
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/configuration/kafka-env.xml

@@ -50,12 +50,6 @@
 # The java implementation to use.
 export JAVA_HOME={{java64_home}}
 export PATH=$PATH:$JAVA_HOME/bin
-
-# Add kafka sink to classpath and related depenencies
-if [ -e "/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar" ]; then
-  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar
-  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*
-fi
     </value>
   </property>
 </configuration>

+ 0 - 3
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py

@@ -35,9 +35,6 @@ def kafka():
     kafka_server_config = mutable_config_dict(params.config['configurations']['kafka-broker'])
     kafka_server_config['broker.id'] = brokerid
     kafka_server_config['host.name'] = params.hostname
-    kafka_server_config['kafka.timeline.metrics.host'] = params.metric_collector_host
-    kafka_server_config['kafka.timeline.metrics.port'] = params.metric_collector_port
-    kafka_server_config['kafka.metrics.reporters'] = params.kafka_metrics_reporters
     kafka_data_dir = kafka_server_config['log.dirs']
     Directory(filter(None,kafka_data_dir.split(",")),
               owner=params.kafka_user,

+ 0 - 27
ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py

@@ -60,30 +60,3 @@ if (('kafka-log4j' in config['configurations']) and ('content' in config['config
     log4j_props = config['configurations']['kafka-log4j']['content']
 else:
     log4j_props = None
-
-if 'ganglia_server_host' in config['clusterHostInfo'] and \
-    len(config['clusterHostInfo']['ganglia_server_host'])>0:
-  ganglia_installed = True
-  ganglia_server = config['clusterHostInfo']['ganglia_server_host'][0]
-  ganglia_report_interval = 60
-else:
-  ganglia_installed = False
-
-kafka_metrics_reporters=""
-
-if ganglia_installed:
-  kafka_metrics_reporters = "kafka.ganglia.KafkaGangliaMetricsReporter"
-
-ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
-has_metric_collector = not len(ams_collector_hosts) == 0
-
-if has_metric_collector:
-  metric_collector_host = ams_collector_hosts[0]
-  metric_collector_port = default("/configurations/ams-site/timeline.metrics.service.webapp.address", "0.0.0.0:8188")
-  if metric_collector_port and metric_collector_port.find(':') != -1:
-    metric_collector_port = metric_collector_port.split(':')[1]
-
-  if not len(kafka_metrics_reporters) == 0:
-      kafka_metrics_reporters = kafka_metrics_reporters + ','
-
-  kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"