Ver código fonte

AMBARI-9233. Storm cluster metrics need to be reported to AMS (Dmytro Sen via smohanty)

Sumit Mohanty 10 anos atrás
pai
commit
293114af6d

+ 1 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -36,6 +36,7 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
   public static final String METRICS_SEND_INTERVAL = "sendInterval";
   public static final String COLLECTOR_HOST_PROPERTY = "collector";
+  public static final String COLLECTOR_PORT_PROPERTY = "port";
 
   protected final Log LOG;
   private HttpClient httpClient = new HttpClient();

+ 6 - 1
ambari-metrics/ambari-metrics-storm-sink/pom.xml

@@ -28,6 +28,11 @@ limitations under the License.
   <artifactId>ambari-metrics-storm-sink</artifactId>
   <version>0.1.0-SNAPSHOT</version>
   <packaging>jar</packaging>
+
+  <properties>
+    <storm.version>0.9.3</storm.version>
+  </properties>
+
   <build>
     <plugins>
       <plugin>
@@ -99,7 +104,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.storm</groupId>
       <artifactId>storm-core</artifactId>
-      <version>0.9.3.2.2.1.0-2240</version>
+      <version>${storm.version}</version>
       <scope>compile</scope>
     </dependency>
     <dependency>

+ 3 - 0
ambari-metrics/ambari-metrics-storm-sink/src/main/assemblies/jar-with-common.xml

@@ -29,6 +29,9 @@
         <include>org.apache.ambari:ambari-metrics-common</include>
         <include>org.apache.ambari:ambari-metrics-storm-sink</include>
         <include>org.codehaus.jackson:jackson-mapper-asl</include> <!--missing in storm classpath-->
+        <include>org.codehaus.jackson:jackson-core-asl</include> <!--missing in storm classpath-->
+        <include>org.codehaus.jackson:jackson-xc</include> <!--missing in storm classpath-->
+        <include>commons-httpclient:commons-httpclient</include> <!--missing in storm classpath-->
       </includes>
     </dependencySet>
   </dependencySets>

+ 2 - 1
ambari-metrics/ambari-metrics-storm-sink/src/main/conf/storm-metrics2.properties.j2

@@ -16,6 +16,7 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_host}}:8188
+collector={{metric_collector_host}}
+port=6188
 maxRowCacheSize=10000
 sendInterval=59000

+ 158 - 0
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.storm;
+
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.SupervisorSummary;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.metric.IClusterReporter;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.util.Servers;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
+  implements IClusterReporter {
+
+  public static final String COLLECTOR_HOST = "host";
+  public static final String COLLECTOR_PORT = "port";
+
+  public static final String METRICS_COLLECTOR = "metrics_collector";
+
+  public static final String APP_ID = "nimbus";
+
+  private String hostname;
+  private SocketAddress socketAddress;
+  private String collectorUri;
+  private NimbusClient nimbusClient;
+
+  public StormTimelineMetricsReporter() {
+
+  }
+
+  @Override
+  protected SocketAddress getServerSocketAddress() {
+    return this.socketAddress;
+  }
+
+  @Override
+  protected String getCollectorUri() {
+    return this.collectorUri;
+  }
+
+  @Override
+  public void prepare(Map conf) {
+    LOG.info("Preparing Storm Metrics Reporter");
+    try {
+      try {
+        hostname = InetAddress.getLocalHost().getHostName();
+      } catch (UnknownHostException e) {
+        LOG.error("Could not identify hostname.");
+        throw new RuntimeException("Could not identify hostname.", e);
+      }
+      Validate.notNull(conf.get(METRICS_COLLECTOR), METRICS_COLLECTOR + " can not be null");
+      Map cf = (Map) conf.get(METRICS_COLLECTOR);
+      Map stormConf = Utils.readStormConfig();
+      this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
+      String collectorHostname = cf.get(COLLECTOR_HOST).toString();
+      String port = cf.get(COLLECTOR_PORT).toString();
+      collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
+      List<InetSocketAddress> socketAddresses =
+        Servers.parse(collectorHostname, Integer.valueOf(port));
+      if (socketAddresses != null && !socketAddresses.isEmpty()) {
+        socketAddress = socketAddresses.get(0);
+      }
+    } catch (Exception e) {
+      LOG.warn("could not initialize metrics collector, please specify host, port under $STORM_HOME/conf/config.yaml ", e);
+    }
+
+  }
+
+  @Override
+  public void reportMetrics() throws Exception {
+    List<TimelineMetric> totalMetrics = new ArrayList<TimelineMetric>(7);
+    ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
+    long currentTimeMillis = System.currentTimeMillis();
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      APP_ID, "Supervisors", String.valueOf(cs.get_supervisors_size())));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      APP_ID, "Topologies", String.valueOf(cs.get_topologies_size())));
+
+    List<SupervisorSummary> sups = cs.get_supervisors();
+    int totalSlots = 0;
+    int usedSlots = 0;
+    for (SupervisorSummary ssum : sups) {
+      totalSlots += ssum.get_num_workers();
+      usedSlots += ssum.get_num_used_workers();
+    }
+    int freeSlots = totalSlots - usedSlots;
+
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      APP_ID, "Total Slots", String.valueOf(totalSlots)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      APP_ID, "Used Slots", String.valueOf(usedSlots)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      APP_ID, "Free Slots", String.valueOf(freeSlots)));
+
+    List<TopologySummary> topos = cs.get_topologies();
+    int totalExecutors = 0;
+    int totalTasks = 0;
+    for (TopologySummary topo : topos) {
+      totalExecutors += topo.get_num_executors();
+      totalTasks += topo.get_num_tasks();
+    }
+
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      APP_ID, "Total Executors", String.valueOf(totalExecutors)));
+    totalMetrics.add(createTimelineMetric(currentTimeMillis,
+      APP_ID, "Total Tasks", String.valueOf(totalTasks)));
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    timelineMetrics.setMetrics(totalMetrics);
+
+    emitMetrics(timelineMetrics);
+
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(attributeName);
+    timelineMetric.setHostName(hostname);
+    timelineMetric.setAppId(component);
+    timelineMetric.setStartTime(currentTimeMillis);
+    timelineMetric.setType(ClassUtils.getShortCanonicalName(
+      attributeValue, "Number"));
+    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
+    return timelineMetric;
+  }
+
+}

+ 2 - 2
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -73,9 +73,9 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
         String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
-    collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + "/ws/v1/timeline/metrics";
+    collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics";
     List<InetSocketAddress> socketAddresses =
-        Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), 8188);
+        Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), Integer.valueOf(configuration.getProperty(COLLECTOR_PORT_PROPERTY)));
     if (socketAddresses != null && !socketAddresses.isEmpty()) {
       socketAddress = socketAddresses.get(0);
     }

+ 4 - 0
ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/params.py

@@ -40,9 +40,11 @@ stack_is_hdp22_or_further = hdp_stack_version != "" and compare_versions(hdp_sta
 if hdp_stack_version != "" and compare_versions(hdp_stack_version, '2.2') >= 0:
   rest_lib_dir = '/usr/hdp/current/storm-client/contrib/storm-rest'
   storm_bin_dir = "/usr/hdp/current/storm-client/bin"
+  storm_lib_dir = "/usr/hdp/current/storm-client/lib"
 else:
   rest_lib_dir = "/usr/lib/storm/contrib/storm-rest"
   storm_bin_dir = "/usr/bin"
+  storm_lib_dir = "/usr/lib/storm/lib/"
 
 storm_user = config['configurations']['storm-env']['storm_user']
 log_dir = config['configurations']['storm-env']['storm_log_dir']
@@ -95,6 +97,8 @@ ams_collector_hosts = default("/clusterHostInfo/metric_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
 if has_metric_collector:
   metric_collector_host = ams_collector_hosts[0]
+  metric_collector_report_interval = 60
+metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink*.jar"
 
 # ranger host
 ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", [])

+ 6 - 2
ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/scripts/storm.py

@@ -19,6 +19,7 @@ limitations under the License.
 """
 
 from resource_management.core.resources import File
+from resource_management.core.resources import Execute
 from resource_management.core.resources import Directory
 from resource_management.core.source import InlineTemplate
 from resource_management.libraries.resources.template_config import TemplateConfig
@@ -26,7 +27,6 @@ from resource_management.libraries.functions.format import format
 from resource_management.core.source import Template
 from resource_management.libraries.functions import compare_versions
 from yaml_utils import escape_yaml_propetry
-import sys
 
 def storm():
   import params
@@ -69,6 +69,10 @@ def storm():
         content=Template("storm-metrics2.properties.j2")
     )
 
+    Execute(format("sudo ln -s {metric_collector_sink_jar} {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+            not_if=format("ls {storm_lib_dir}/ambari-metrics-storm-sink.jar"),
+    )
+
   File(format("{conf_dir}/storm-env.sh"),
     owner=params.storm_user,
     content=InlineTemplate(params.storm_env_sh_template)
@@ -91,7 +95,7 @@ def storm():
            owner='root',
            group=params.user_group
       )
-    
+
 
 '''
 Finds minimal real user UID

+ 17 - 18
ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/templates/config.yaml.j2

@@ -1,21 +1,3 @@
-{#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -62,4 +44,21 @@ ganglia:
   # an <IP>:<HOSTNAME> pair to spoof
   # this allows us to simulate storm cluster metrics coming from a specific host
   #spoof: "192.168.1.1:storm"
+{% endif %}
+
+{% if has_metric_collector %}
+enableGanglia: False
+
+ganglia:
+  reportInterval: {{metric_collector_report_interval}}
+
+enableMetricsSink: True
+
+metrics_collector:
+
+  reportInterval: {{metric_collector_report_interval}}
+
+  host: "{{metric_collector_host}}"
+  port: 6188
+
 {% endif %}

+ 2 - 1
ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/templates/storm-metrics2.properties.j2

@@ -16,6 +16,7 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_host}}:8188
+collector={{metric_collector_host}}
+port=6188
 maxRowCacheSize=10000
 sendInterval=59000

+ 1 - 0
ambari-server/src/main/resources/common-services/STORM/0.9.1.2.1/package/templates/storm.yaml.j2

@@ -63,4 +63,5 @@ supervisor.enable: true
 topology.metrics.consumer.register:
   - class: "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink"
     parallelism.hint: 1
+metrics.reporter.register: "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter"
 {% endif %}

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.2/services/STORM/configuration/storm-env.xml

@@ -34,7 +34,7 @@
 export JAVA_HOME={{java64_home}}
 
 # export STORM_CONF_DIR=""
-STORM_HOME=/usr/hdp/current/storm-client
+export STORM_HOME=/usr/hdp/current/storm-client
     </value>
   </property>
 </configuration>