Pārlūkot izejas kodu

AMBARI-17080 Support Storm 1.0 in Ambari Metrics for Storm (dsen)

Aravindan Vijayan 9 gadi atpakaļ
vecāks
revīzija
879713ac7b
23 mainītis faili ar 941 papildinājumiem un 102 dzēšanām
  1. 26 1
      ambari-metrics/ambari-metrics-assembly/pom.xml
  2. 8 0
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml
  3. 9 1
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml
  4. 207 0
      ambari-metrics/ambari-metrics-storm-sink-legacy/pom.xml
  5. 21 0
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml
  6. 162 0
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
  7. 202 0
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
  8. 91 0
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
  9. 1 2
      ambari-metrics/ambari-metrics-storm-sink/pom.xml
  10. 0 22
      ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2
  11. 38 0
      ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/NumberUtil.java
  12. 119 60
      ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
  13. 4 4
      ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
  14. 1 1
      ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
  15. 6 0
      ambari-metrics/pom.xml
  16. 18 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
  17. 2 1
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
  18. 7 2
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/storm.py
  19. 8 2
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/ui_server.py
  20. 1 0
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
  21. 4 0
      ambari-server/src/main/resources/stacks/HDP/2.5/services/STORM/configuration/storm-site.xml
  22. 2 2
      ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py
  23. 4 4
      ambari-server/src/test/python/stacks/2.1/STORM/test_storm_ui_server.py

+ 26 - 1
ambari-metrics/ambari-metrics-assembly/pom.xml

@@ -38,6 +38,7 @@
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
+    <storm-sink-legacy.dir>${project.basedir}/../ambari-metrics-storm-sink-legacy</storm-sink-legacy.dir>
     <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
     <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
     <python.ver>python &gt;= 2.6</python.ver>
@@ -51,6 +52,7 @@
     <deb.dependency.list>${deb.python.ver},python-dev,gcc</deb.dependency.list>
     <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
     <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
+    <storm.sink.legacy.jar>ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</storm.sink.legacy.jar>
     <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
     <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
   </properties>
@@ -390,6 +392,14 @@
                         </source>
                       </sources>
                     </mapping>
+                    <mapping>
+                      <directory>/usr/lib/storm/lib</directory>
+                      <sources>
+                        <source>
+                          <location>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</location>
+                        </source>
+                      </sources>
+                    </mapping>
                     <mapping>
                       <directory>/usr/lib/ambari-metrics-kafka-sink</directory>
                       <sources>
@@ -934,7 +944,7 @@
                   </mapper>
                 </data>
 
-                <!-- storm sink -->
+                <!-- storm sinks -->
 
                 <data>
                   <src>${storm-sink.dir}/target/${storm.sink.jar}</src>
@@ -946,6 +956,16 @@
                     <prefix>/usr/lib/storm/lib</prefix>
                   </mapper>
                 </data>
+                <data>
+                  <src>${storm-sink-legacy.dir}/target/${storm.sink.legacy.jar}</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <filemode>644</filemode>
+                    <dirmode>755</dirmode>
+                    <prefix>/usr/lib/storm/lib</prefix>
+                  </mapper>
+                </data>
 
                 <!-- kafka sink -->
 
@@ -1251,6 +1271,11 @@
       <artifactId>ambari-metrics-storm-sink</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-kafka-sink</artifactId>

+ 8 - 0
ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink-windows.xml

@@ -38,6 +38,10 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
+    <fileSet>
+      <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
+      <outputDirectory>hadoop-sink/conf</outputDirectory>
+    </fileSet>
     <fileSet>
       <directory>${kafka-sink.dir}/target/lib</directory>
       <outputDirectory>hadoop-sink/lib</outputDirectory>
@@ -57,6 +61,10 @@
       <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
+    <file>
+      <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
+      <outputDirectory>hadoop-sink</outputDirectory>
+    </file>
     <file>
       <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>

+ 9 - 1
ambari-metrics/ambari-metrics-assembly/src/main/assembly/sink.xml

@@ -38,6 +38,10 @@
       <directory>${storm-sink.dir}/src/main/conf</directory>
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
+    <fileSet>
+      <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
+      <outputDirectory>hadoop-sink/conf</outputDirectory>
+    </fileSet>
     <fileSet>
       <directory>${kafka-sink.dir}/target/lib</directory>
       <outputDirectory>hadoop-sink/lib</outputDirectory>
@@ -57,6 +61,10 @@
       <source>${storm-sink.dir}/target/ambari-metrics-storm-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
+    <file>
+      <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
+      <outputDirectory>hadoop-sink</outputDirectory>
+    </file>
     <file>
       <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
@@ -66,4 +74,4 @@
 
 
 
-</assembly>
+</assembly>

+ 207 - 0
ambari-metrics/ambari-metrics-storm-sink-legacy/pom.xml

@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>ambari-metrics</artifactId>
+    <groupId>org.apache.ambari</groupId>
+    <version>2.4.0.0.0</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
+  <version>2.4.0.0.0</version>
+  <name>Ambari Metrics Storm Sink (Legacy)</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
+    <storm.version>0.10.0.2.3.0.0-2557</storm.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.0</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <id>parse-version</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>parse-version</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>regex-property</id>
+            <goals>
+              <goal>regex-property</goal>
+            </goals>
+            <configuration>
+              <name>ambariVersion</name>
+              <value>${project.version}</value>
+              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
+              <replacement>$1.$2.$3.$4</replacement>
+              <failIfNoMatch>false</failIfNoMatch>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.goldin</groupId>
+        <artifactId>copy-maven-plugin</artifactId>
+        <version>0.2.5</version>
+        <executions>
+          <execution>
+            <id>create-archive</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.2</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <outputFile>${project.build.directory}/${project.artifactId}-with-common-${project.version}.jar</outputFile>
+          <minimizeJar>false</minimizeJar>
+          <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
+          <artifactSet>
+            <includes>
+              <include>org.apache.ambari:ambari-metrics-common</include>
+              <include>org.codehaus.jackson:jackson-mapper-asl</include>
+              <include>org.codehaus.jackson:jackson-core-asl</include>
+              <include>org.codehaus.jackson:jackson-xc</include>
+              <include>org.apache.hadoop:hadoop-annotations</include>
+              <include>commons-logging:commons-logging</include>
+              <include>org.apache.commons:commons-lang3</include>
+              <include>commons-codec:commons-codec</include>
+            </includes>
+          </artifactSet>
+          <relocations>
+            <relocation>
+              <pattern>org.apache.commons.logging</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.logging</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.hadoop.classification</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.hadoop.classification</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.codehaus.jackson</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jackson</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.commons.lang3</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.lang3</shadedPattern>
+            </relocation>
+            <relocation>
+              <pattern>org.apache.commons.codec</pattern>
+              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.codec</shadedPattern>
+            </relocation>
+          </relocations>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.vafer</groupId>
+        <artifactId>jdeb</artifactId>
+        <version>1.0.1</version>
+        <executions>
+          <execution>
+            <!--Stub execution on direct plugin call - workaround for ambari deb build process-->
+            <id>stub-execution</id>
+            <phase>none</phase>
+            <goals>
+              <goal>jdeb</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <skip>true</skip>
+          <attach>false</attach>
+          <submodules>false</submodules>
+          <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.3.2</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.8</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${storm.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>4.10</version>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

+ 21 - 0
ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml

@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<assembly>
+    <id>empty</id>
+    <formats/>
+</assembly>

+ 162 - 0
ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.SupervisorSummary;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.metric.IClusterReporter;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
+  implements IClusterReporter {
+
+  public static final String METRICS_COLLECTOR_CATEGORY = "metrics_collector";
+  public static final String APP_ID = "appId";
+
+  private String hostname;
+  private String collectorUri;
+  private NimbusClient nimbusClient;
+  private String applicationId;
+  private int timeoutSeconds;
+
+  public StormTimelineMetricsReporter() {
+
+  }
+
+  @Override
+  protected String getCollectorUri() {
+    return this.collectorUri;
+  }
+
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  @Override
+  public void prepare(Map conf) {
+    LOG.info("Preparing Storm Metrics Reporter");
+    try {
+      try {
+        hostname = InetAddress.getLocalHost().getHostName();
+        //If not FQDN , call  DNS
+        if ((hostname == null) || (!hostname.contains("."))) {
+          hostname = InetAddress.getLocalHost().getCanonicalHostName();
+        }
+      } catch (UnknownHostException e) {
+        LOG.error("Could not identify hostname.");
+        throw new RuntimeException("Could not identify hostname.", e);
+      }
+      Validate.notNull(conf.get(METRICS_COLLECTOR_CATEGORY), METRICS_COLLECTOR_CATEGORY + " can not be null");
+      Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
+      Map stormConf = Utils.readStormConfig();
+      this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
+      String collector = cf.get(COLLECTOR_PROPERTY).toString();
+      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+        DEFAULT_POST_TIMEOUT_SECONDS;
+      applicationId = cf.get(APP_ID).toString();
+      collectorUri = collector + WS_V1_TIMELINE_METRICS;
+      if (collectorUri.toLowerCase().startsWith("https://")) {
+        String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
+        String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
+        String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
+        loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+      }
+    } catch (Exception e) {
+      LOG.warn("Could not initialize metrics collector, please specify " +
+        "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);
+    }
+
+  }
+
+  @Override
+  public void reportMetrics() throws Exception {
+    List<TimelineMetric> totalMetrics = new ArrayList<TimelineMetric>(7);
+    ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
+    long currentTimeMillis = System.currentTimeMillis();
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      applicationId, "Supervisors", String.valueOf(cs.get_supervisors_size())));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      applicationId, "Topologies", String.valueOf(cs.get_topologies_size())));
+
+    List<SupervisorSummary> sups = cs.get_supervisors();
+    int totalSlots = 0;
+    int usedSlots = 0;
+    for (SupervisorSummary ssum : sups) {
+      totalSlots += ssum.get_num_workers();
+      usedSlots += ssum.get_num_used_workers();
+    }
+    int freeSlots = totalSlots - usedSlots;
+
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      applicationId, "Total Slots", String.valueOf(totalSlots)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      applicationId, "Used Slots", String.valueOf(usedSlots)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      applicationId, "Free Slots", String.valueOf(freeSlots)));
+
+    List<TopologySummary> topos = cs.get_topologies();
+    int totalExecutors = 0;
+    int totalTasks = 0;
+    for (TopologySummary topo : topos) {
+      totalExecutors += topo.get_num_executors();
+      totalTasks += topo.get_num_tasks();
+    }
+
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      applicationId, "Total Executors", String.valueOf(totalExecutors)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      applicationId, "Total Tasks", String.valueOf(totalTasks)));
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    timelineMetrics.setMetrics(totalMetrics);
+
+    try {
+      emitMetrics(timelineMetrics);
+    } catch (UnableToConnectException e) {
+      LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
+    }
+
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(attributeName);
+    timelineMetric.setHostName(hostname);
+    timelineMetric.setAppId(component);
+    timelineMetric.setStartTime(currentTimeMillis);
+    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+    return timelineMetric;
+  }
+
+}

+ 202 - 0
ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
+
+public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
+  private String collectorUri;
+  private TimelineMetricsCache metricsCache;
+  private String hostname;
+  private int timeoutSeconds;
+  private String topologyName;
+
+  @Override
+  protected String getCollectorUri() {
+    return collectorUri;
+  }
+
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  @Override
+  public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
+    LOG.info("Preparing Storm Metrics Sink");
+    try {
+      hostname = InetAddress.getLocalHost().getHostName();
+      //If not FQDN , call  DNS
+      if ((hostname == null) || (!hostname.contains("."))) {
+        hostname = InetAddress.getLocalHost().getCanonicalHostName();
+      }
+    } catch (UnknownHostException e) {
+      LOG.error("Could not identify hostname.");
+      throw new RuntimeException("Could not identify hostname.", e);
+    }
+    Configuration configuration = new Configuration("/storm-metrics2.properties");
+    timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
+        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
+    int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
+        String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
+    int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
+        String.valueOf(MAX_EVICTION_TIME_MILLIS)));
+    metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
+    collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS;
+    if (collectorUri.toLowerCase().startsWith("https://")) {
+      String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
+      String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
+      String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+      loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+    }
+    this.topologyName = removeNonce(topologyContext.getStormId());
+  }
+
+  @Override
+  public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
+    List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+    for (DataPoint dataPoint : dataPoints) {
+      LOG.debug(dataPoint.name + " = " + dataPoint.value);
+      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      for (DataPoint populatedDataPoint : populatedDataPoints) {
+        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
+            taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name,
+            Double.valueOf(populatedDataPoint.value.toString()));
+
+        // Put intermediate values into the cache until it is time to send
+        metricsCache.putTimelineMetric(timelineMetric);
+
+        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(timelineMetric.getMetricName());
+
+        if (cachedMetric != null) {
+          metricList.add(cachedMetric);
+        }
+      }
+    }
+
+    if (!metricList.isEmpty()) {
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+
+      try {
+        emitMetrics(timelineMetrics);
+      } catch (UnableToConnectException uce) {
+        LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
+      }
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    LOG.info("Stopping Storm Metrics Sink");
+  }
+
+  private String removeNonce(String topologyId) {
+    return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
+  }
+
+  private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
+    List<DataPoint> dataPoints = new ArrayList<>();
+
+    if (dataPoint.value == null) {
+      LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name);
+    } else if (dataPoint.value instanceof Map) {
+      Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value;
+
+      for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
+        Double value = convertValueToDouble(entry.getKey(), entry.getValue());
+        if (value != null) {
+          dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), value));
+        }
+      }
+    } else {
+      Double value = convertValueToDouble(dataPoint.name, dataPoint.value);
+      if (value != null) {
+        dataPoints.add(new DataPoint(dataPoint.name, value));
+      }
+    }
+
+    return dataPoints;
+  }
+
+  private Double convertValueToDouble(String metricName, Object value) {
+    if (value instanceof Number) {
+      return ((Number) value).doubleValue();
+    } else if (value instanceof String) {
+      try {
+        return Double.parseDouble((String) value);
+      } catch (NumberFormatException e) {
+        LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
+            value + ". Discarding.");
+      }
+
+      return null;
+    } else {
+      LOG.warn("Data point with name " + metricName + " has value " + value +
+          " which is not supported. Discarding.");
+
+      return null;
+    }
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName,
+      String attributeName, Double attributeValue) {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName));
+    timelineMetric.setHostName(hostName);
+    timelineMetric.setAppId(topologyName);
+    timelineMetric.setStartTime(currentTimeMillis);
+    timelineMetric.setType(ClassUtils.getShortCanonicalName(
+        attributeValue, "Number"));
+    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
+    return timelineMetric;
+  }
+
+  private String createMetricName(String componentId, int taskId, String attributeName) {
+    String metricName = componentId + "." + taskId + "." + attributeName;
+    // since '._' is treat as special character (separator) so it should be replaced
+    return metricName.replace('_', '-');
+  }
+
+  public void setMetricsCache(TimelineMetricsCache metricsCache) {
+    this.metricsCache = metricsCache;
+  }
+
+}

+ 91 - 0
ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+
+public class StormTimelineMetricsSinkTest {
+  @Test
+  public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")).andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1"))
+        .andReturn(new TimelineMetric()).once();
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    Map<String, Object> valueMap = new HashMap<>();
+    valueMap.put("field1", 53);
+    valueMap.put("field2", 64.12);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
+    verify(timelineMetricsCache);
+  }
+}

+ 1 - 2
ambari-metrics/ambari-metrics-storm-sink/pom.xml

@@ -31,8 +31,7 @@ limitations under the License.
   <packaging>jar</packaging>
 
   <properties>
-    <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
-    <storm.version>0.10.0.2.3.0.0-2557</storm.version>
+    <storm.version>1.1.0-SNAPSHOT</storm.version>
   </properties>
 
   <build>

+ 0 - 22
ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2

@@ -1,22 +0,0 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-collector={{metric_collector_host}}
-port=6188
-maxRowCacheSize=10000
-sendInterval=59000

+ 38 - 0
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/NumberUtil.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+public class NumberUtil {
+  private NumberUtil() {
+  }
+
+  public static Double convertValueToDouble(Object value) {
+    if (value instanceof Number) {
+      return ((Number) value).doubleValue();
+    } else if (value instanceof String) {
+      try {
+        return Double.parseDouble((String) value);
+      } catch (NumberFormatException e) {
+        throw e;
+      }
+    } else {
+      return null;
+    }
+  }
+}

+ 119 - 60
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -18,33 +18,32 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.metric.IClusterReporter;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
 import org.apache.commons.lang3.Validate;
+import org.apache.commons.lang3.ClassUtils;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
+import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
+import org.apache.storm.metric.api.DataPoint;
+import org.apache.storm.metric.api.IClusterMetricsConsumer;
+import org.apache.storm.utils.Utils;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
-  implements IClusterReporter {
+  implements IClusterMetricsConsumer {
 
-  public static final String METRICS_COLLECTOR_CATEGORY = "metrics_collector";
-  public static final String APP_ID = "appId";
+  public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
+  public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "nimbus";
 
   private String hostname;
   private String collectorUri;
-  private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
 
@@ -63,7 +62,7 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   }
 
   @Override
-  public void prepare(Map conf) {
+  public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
       try {
@@ -76,20 +75,17 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
         LOG.error("Could not identify hostname.");
         throw new RuntimeException("Could not identify hostname.", e);
       }
-      Validate.notNull(conf.get(METRICS_COLLECTOR_CATEGORY), METRICS_COLLECTOR_CATEGORY + " can not be null");
-      Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
-      Map stormConf = Utils.readStormConfig();
-      this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
-      String collector = cf.get(COLLECTOR_PROPERTY).toString();
-      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
-        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+      Configuration configuration = new Configuration("/storm-metrics2.properties");
+      String collector = configuration.getProperty(COLLECTOR_PROPERTY).toString();
+      timeoutSeconds = configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS).toString()) :
         DEFAULT_POST_TIMEOUT_SECONDS;
-      applicationId = cf.get(APP_ID).toString();
+      applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
       collectorUri = collector + WS_V1_TIMELINE_METRICS;
       if (collectorUri.toLowerCase().startsWith("https://")) {
-        String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
-        String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
-        String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
+        String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
+        String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
+        String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
         loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
       }
     } catch (Exception e) {
@@ -100,43 +96,35 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   }
 
   @Override
-  public void reportMetrics() throws Exception {
-    List<TimelineMetric> totalMetrics = new ArrayList<TimelineMetric>(7);
-    ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
-    long currentTimeMillis = System.currentTimeMillis();
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Supervisors", String.valueOf(cs.get_supervisors_size())));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Topologies", String.valueOf(cs.get_topologies_size())));
-
-    List<SupervisorSummary> sups = cs.get_supervisors();
-    int totalSlots = 0;
-    int usedSlots = 0;
-    for (SupervisorSummary ssum : sups) {
-      totalSlots += ssum.get_num_workers();
-      usedSlots += ssum.get_num_used_workers();
-    }
-    int freeSlots = totalSlots - usedSlots;
-
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Total Slots", String.valueOf(totalSlots)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Used Slots", String.valueOf(usedSlots)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Free Slots", String.valueOf(freeSlots)));
-
-    List<TopologySummary> topos = cs.get_topologies();
-    int totalExecutors = 0;
-    int totalTasks = 0;
-    for (TopologySummary topo : topos) {
-      totalExecutors += topo.get_num_executors();
-      totalTasks += topo.get_num_tasks();
+  public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) {
+    long timestamp = clusterInfo.getTimestamp();
+    List<TimelineMetric> totalMetrics = new ArrayList<>();
+
+    for (DataPoint dataPoint : dataPoints) {
+      LOG.debug(dataPoint.getName() + " = " + dataPoint.getValue());
+      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      for (DataPoint populatedDataPoint : populatedDataPoints) {
+        LOG.debug("Populated datapoint: " + dataPoint.getName() + " = " + dataPoint.getValue());
+
+        try {
+          StormAmbariMappedMetric mappedMetric = StormAmbariMappedMetric
+              .valueOf(populatedDataPoint.getName());
+          TimelineMetric timelineMetric = createTimelineMetric(timestamp * 1000, applicationId,
+              mappedMetric.getAmbariMetricName(),
+              Double.valueOf(populatedDataPoint.getValue().toString()));
+
+          totalMetrics.add(timelineMetric);
+        } catch (IllegalArgumentException e) {
+          // not interested metrics on Ambari, skip
+          LOG.debug("Not interested metrics, skip: " + populatedDataPoint.getName());
+        }
+      }
     }
 
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Total Executors", String.valueOf(totalExecutors)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-      applicationId, "Total Tasks", String.valueOf(totalTasks)));
+    if (totalMetrics.size() <= 0) {
+      return;
+    }
 
     TimelineMetrics timelineMetrics = new TimelineMetrics();
     timelineMetrics.setMetrics(totalMetrics);
@@ -149,14 +137,85 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
 
   }
 
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+  @Override
+  public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) {
+    // Ambari is not interested on metrics on each supervisor
+  }
+
+  @Override
+  public void cleanup() {
+    LOG.info("Stopping Storm Metrics Reporter");
+  }
+
+  private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
+    List<DataPoint> dataPoints = new ArrayList<>();
+
+    if (dataPoint.getValue() == null) {
+      LOG.warn("Data point with name " + dataPoint.getName() + " is null. Discarding." + dataPoint
+          .getName());
+    } else if (dataPoint.getValue() instanceof Map) {
+      Map<String, Object> dataMap = (Map<String, Object>) dataPoint.getValue();
+
+      for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
+        Double value = convertValueToDouble(entry.getKey(), entry.getValue());
+        if (value != null) {
+          dataPoints.add(new DataPoint(dataPoint.getName() + "." + entry.getKey(), value));
+        }
+      }
+    } else {
+      Double value = convertValueToDouble(dataPoint.getName(), dataPoint.getValue());
+      if (value != null) {
+        dataPoints.add(new DataPoint(dataPoint.getName(), value));
+      }
+    }
+
+    return dataPoints;
+  }
+
+  private Double convertValueToDouble(String metricName, Object value) {
+    try {
+      Double converted = NumberUtil.convertValueToDouble(value);
+      if (converted == null) {
+        LOG.warn("Data point with name " + metricName + " has value " + value +
+            " which is not supported. Discarding.");
+      }
+      return converted;
+    } catch (NumberFormatException e) {
+      LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
+          value + ". Discarding.");
+      return null;
+    }
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component,
+                                              String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostname);
     timelineMetric.setAppId(component);
     timelineMetric.setStartTime(currentTimeMillis);
-    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
-    return timelineMetric;
+    timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
+    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);    return timelineMetric;
+  }
+
+  enum StormAmbariMappedMetric {
+    supervisors("Supervisors"),
+    topologies("Topologies"),
+    slotsTotal("Total Slots"),
+    slotsUsed("Used Slots"),
+    slotsFree("Free Slots"),
+    executorsTotal("Total Executors"),
+    tasksTotal("Total Tasks");
+
+    private String ambariMetricName;
+
+    StormAmbariMappedMetric(String ambariMetricName) {
+      this.ambariMetricName = ambariMetricName;
+    }
+
+    public String getAmbariMetricName() {
+      return ambariMetricName;
+    }
   }
 
 }

+ 4 - 4
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -216,4 +216,4 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     this.metricsCache = metricsCache;
   }
 
-}
+}

+ 1 - 1
ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java

@@ -37,7 +37,7 @@ import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import backtype.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metric.api.IMetricsConsumer;
 
 public class StormTimelineMetricsSinkTest {
   @Test

+ 6 - 0
ambari-metrics/pom.xml

@@ -26,6 +26,7 @@
     <module>ambari-metrics-flume-sink</module>
     <module>ambari-metrics-kafka-sink</module>
     <module>ambari-metrics-storm-sink</module>
+    <module>ambari-metrics-storm-sink-legacy</module>
     <module>ambari-metrics-timelineservice</module>
     <module>ambari-metrics-host-monitoring</module>
     <module>ambari-metrics-grafana</module>
@@ -77,6 +78,11 @@
       <name>hdp</name>
       <url>http://repo.hortonworks.com/content/groups/public/</url>
     </repository>
+    <repository>
+      <id>apache-snapshots</id>
+      <name>snapshots</name>
+      <url>https://repository.apache.org/content/repositories/snapshots</url>
+    </repository>
   </repositories>
   <dependencyManagement>
     <dependencies>

+ 18 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml

@@ -26,4 +26,22 @@
     <description>Topology metrics reporter.</description>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>storm.cluster.metrics.consumer.register</name>
+    <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"}]</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.consumer.register</name>
+    <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", "parallelism.hint": 1}]</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
 </configuration>

+ 2 - 1
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py

@@ -188,7 +188,8 @@ if has_metric_collector:
   pass
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
-metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink*.jar"
+metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
+metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
 
 jar_jvm_opts = ''
 

+ 7 - 2
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/storm.py

@@ -111,9 +111,14 @@ def storm(name=None):
     # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
     Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
 
-    Execute(format("{sudo} ln -s {metric_collector_sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+    if Script.get_stack_name() == "HDP" and Script.is_stack_greater_or_equal("2.5"):
+      sink_jar = params.metric_collector_sink_jar
+    else:
+      sink_jar = params.metric_collector_legacy_sink_jar
+
+    Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
             not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
-            only_if=format("ls {metric_collector_sink_jar}")
+            only_if=format("ls {sink_jar}")
     )
 
   if params.storm_logs_supported:

+ 8 - 2
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/ui_server.py

@@ -85,6 +85,7 @@ class UiServerDefault(UiServer):
       stack_select.select("storm-client", params.version)
 
   def link_metrics_sink_jar(self):
+    import params
     # Add storm metrics reporter JAR to storm-ui-server classpath.
     # Remove symlinks. They can be there, if you doing upgrade from HDP < 2.2 to HDP >= 2.2
     Link(format("{storm_lib_dir}/ambari-metrics-storm-sink.jar"),
@@ -92,9 +93,14 @@ class UiServerDefault(UiServer):
     # On old HDP 2.1 versions, this symlink may also exist and break EU to newer versions
     Link("/usr/lib/storm/lib/ambari-metrics-storm-sink.jar", action="delete")
 
-    Execute(format("{sudo} ln -s {metric_collector_sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+    if Script.get_stack_name() == "HDP" and Script.is_stack_greater_or_equal("2.5"):
+      sink_jar = params.metric_collector_sink_jar
+    else:
+      sink_jar = params.metric_collector_legacy_sink_jar
+
+    Execute(format("{sudo} ln -s {sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
             not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
-            only_if=format("ls {metric_collector_sink_jar}")
+            only_if=format("ls {sink_jar}")
             )
 
   def start(self, env, upgrade_type=None):

+ 1 - 0
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2

@@ -19,6 +19,7 @@
 collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000
+clusterReporterAppId=nimbus
 
 # HTTPS properties
 truststore.path = {{metric_truststore_path}}

+ 4 - 0
ambari-server/src/main/resources/stacks/HDP/2.5/services/STORM/configuration/storm-site.xml

@@ -54,4 +54,8 @@
     </value-attributes>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>metrics.reporter.register</name>
+    <on-ambari-upgrade delete="true"/>
+  </property>
 </configuration>

+ 2 - 2
ambari-server/src/test/python/stacks/2.1/STORM/test_storm_nimbus.py

@@ -86,9 +86,9 @@ class TestStormNimbus(TestStormBase):
     self.assertResourceCalled('Link', '/usr/lib/storm/lib/ambari-metrics-storm-sink.jar',
                               action = ['delete'],
                               )
-    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar /usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar /usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
                               not_if = 'ls /usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
-                              only_if = 'ls /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar',
+                              only_if = 'ls /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar',
                               )
     self.assertResourceCalled('Execute', 'source /etc/storm/conf/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH ; storm nimbus > /var/log/storm/nimbus.out 2>&1 &\n echo $! > /var/run/storm/nimbus.pid',
         path = ['/usr/bin'],

+ 4 - 4
ambari-server/src/test/python/stacks/2.1/STORM/test_storm_ui_server.py

@@ -57,10 +57,10 @@ class TestStormUiServer(TestStormBase):
                               action=['delete'],
                               )
 
-    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar '
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar '
                                          '/usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
                               not_if=format("ls /usr/lib/storm/lib//ambari-metrics-storm-sink.jar"),
-                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar")
+                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar")
                               )
     self.assertResourceCalled('Execute', 'source /etc/storm/conf/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH ; storm ui > /var/log/storm/ui.out 2>&1 &\n echo $! > /var/run/storm/ui.pid',
         path = ['/usr/bin'],
@@ -127,10 +127,10 @@ class TestStormUiServer(TestStormBase):
                               action=['delete'],
                               )
 
-    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar '
+    self.assertResourceCalled('Execute', 'ambari-sudo.sh ln -s /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar '
                                          '/usr/lib/storm/lib//ambari-metrics-storm-sink.jar',
                               not_if=format("ls /usr/lib/storm/lib//ambari-metrics-storm-sink.jar"),
-                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink*.jar")
+                              only_if=format("ls /usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar")
                               )
     self.assertResourceCalled('Execute', 'source /etc/storm/conf/storm-env.sh ; export PATH=$JAVA_HOME/bin:$PATH ; storm ui > /var/log/storm/ui.out 2>&1 &\n echo $! > /var/run/storm/ui.pid',
         path = ['/usr/bin'],