浏览代码

AMS Storm Sink: apply change of Storm metrics improvement - worker level aggregation. (Jungtaek Lim via avijayan)

Aravindan Vijayan 9 年之前
父节点
当前提交
3e536c087e

+ 86 - 1
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import org.apache.storm.Constants;
 import org.apache.storm.metric.api.IMetricsConsumer;
 import org.apache.storm.task.IErrorReporter;
 import org.apache.storm.task.TopologyContext;
@@ -33,6 +34,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -40,11 +42,17 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
+  // covers built-in metrics but still not beauty
+  private static final String[] METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE = { "-latency", "timems", "time_ms", "rate_secs", "timesecs" };
+
   private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = { ".", "_" };
 
   // create String manually in order to not rely on Guava Joiner or having our own
   private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\"";
 
+  // it's safe since it doesn't exceed the boundary
+  public static final int SYSTEM_TASK_ID = (int) Constants.SYSTEM_TASK_ID;
+
   public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
   public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
   public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset.";
@@ -105,7 +113,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
 
     for (DataPoint dataPoint : dataPoints) {
       LOG.debug(dataPoint.name + " = " + dataPoint.value);
-      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      List<DataPoint> populatedDataPoints;
+      if (taskInfo.srcTaskId == SYSTEM_TASK_ID && dataPoint.value instanceof Collection) {
+        // worker level aggregated metrics - aggregation should be handled
+        List<DataPoint> populatedBeforeAggregationDataPoints = populateAllDataPointValues(dataPoint);
+        Map<String, List<Double>> metricNameKeyedValues = groupByMetricNameDataPoints(populatedBeforeAggregationDataPoints);
+        populatedDataPoints = applyAggregationToMetricNameKeyedDataPoints(metricNameKeyedValues);
+      } else {
+        populatedDataPoints = populateDataPoints(dataPoint);
+      }
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
         String metricName;
@@ -158,6 +175,22 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
   }
 
+  private List<DataPoint> populateAllDataPointValues(DataPoint dataPoint) {
+    List<DataPoint> populatedDataPoints = new ArrayList<>();
+    Collection<Object> values = (Collection<Object>) dataPoint.value;
+    for (Object value : values) {
+      List<DataPoint> populated = populateDataPoints(new DataPoint(dataPoint.name, value));
+      for (DataPoint point : populated) {
+        if (point.value == null) {
+          continue;
+        }
+
+        populatedDataPoints.add(point);
+      }
+    }
+    return populatedDataPoints;
+  }
+
   private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
     List<DataPoint> dataPoints = new ArrayList<>();
 
@@ -202,6 +235,58 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     }
   }
 
+  private Map<String, List<Double>> groupByMetricNameDataPoints(List<DataPoint> populatedDataPoints) {
+    Map<String, List<Double>> metricNameKeyedValues = new HashMap<>();
+    for (DataPoint point : populatedDataPoints) {
+      List<Double> valuesOnMetric = metricNameKeyedValues.get(point.name);
+
+      if (valuesOnMetric == null) {
+        valuesOnMetric = new ArrayList<>();
+        metricNameKeyedValues.put(point.name, valuesOnMetric);
+      }
+
+      valuesOnMetric.add(Double.valueOf(point.value.toString()));
+    }
+    return metricNameKeyedValues;
+  }
+
+  private List<DataPoint> applyAggregationToMetricNameKeyedDataPoints(Map<String, List<Double>> metricNameKeyedValues) {
+    List<DataPoint> populatedDataPoints = new ArrayList<>();
+    for (Map.Entry<String, List<Double>> metricNameToValues : metricNameKeyedValues.entrySet()) {
+      String key = metricNameToValues.getKey();
+      List<Double> values = metricNameToValues.getValue();
+      populatedDataPoints.add(new DataPoint(key, applyAggregateFunction(key, values)));
+    }
+    return populatedDataPoints;
+  }
+
+  private Double applyAggregateFunction(String metricName, List<Double> values) {
+    String lowerCaseMetricName = metricName.toLowerCase();
+    for (String aggregateMetricSubstring : METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE) {
+      if (lowerCaseMetricName.contains(aggregateMetricSubstring)) {
+        return calculateAverage(values);
+      }
+    }
+
+    return calculateSummation(values);
+  }
+
+  private Double calculateSummation(List<Double> values) {
+    Double sum = 0.0;
+    for (Double value : values) {
+      sum += value;
+    }
+    return sum;
+  }
+
+  private Double calculateAverage(List<Double> values) {
+    if (values.isEmpty()) {
+      return 0.0d;
+    }
+
+    return calculateSummation(values) / values.size();
+  }
+
   private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
       String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();

+ 55 - 3
ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.metrics2.sink.storm;
 
 import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
+import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_TASK_ID;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
@@ -28,12 +29,16 @@ import static org.easymock.EasyMock.verify;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.storm.Constants;
+import org.apache.storm.shade.com.google.common.collect.Lists;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -121,12 +126,59 @@ public class StormTimelineMetricsSinkTest {
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
     replay(timelineMetricsCache);
 
-    Map<String, Object> valueMap = new HashMap<>();
-    valueMap.put("field1", 53);
-    valueMap.put("field2", 64.12);
+    Map<String, Object> valueMap = getTestValueMap();
     stormTimelineMetricsSink.handleDataPoints(
         new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
         Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
     verify(timelineMetricsCache);
   }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testWorkerLevelAggregatedNumericMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", Lists.newArrayList(42.3, 42.3))));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testWorkerLevelAggregatedMapMetricMetricSubmission() throws InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
+    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1.field1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    List<Map<String, Object>> valueMapList = new ArrayList<>();
+    valueMapList.add(getTestValueMap());
+    valueMapList.add(getTestValueMap());
+
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMapList)));
+    verify(timelineMetricsCache);
+  }
+
+  private Map<String, Object> getTestValueMap() {
+    Map<String, Object> valueMap = new HashMap<>();
+    valueMap.put("field1", 53);
+    valueMap.put("field2", 64.12);
+    return valueMap;
+  }
 }

+ 40 - 3
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml

@@ -33,15 +33,52 @@
     <value-attributes>
       <overridable>false</overridable>
     </value-attributes>
-    <on-ambari-upgrade add="false"/>
+    <on-ambari-upgrade add="true"/>
   </property>
   <property>
     <name>topology.metrics.consumer.register</name>
-    <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", "parallelism.hint": 1, "expandMapType": true, "metricNameSeparator": ".", "whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", "__receive\\.population$", "__sendqueue\\.population$", "__execute-count", "__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", "memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]</value>
+    <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", "parallelism.hint": 1, "whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", "__receive\\.population$", "__sendqueue\\.population$", "__execute-count", "__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", "memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.aggregate.per.worker</name>
+    <value>true</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.aggregate.metric.evict.secs</name>
+    <value>5</value>
     <description></description>
     <value-attributes>
       <overridable>false</overridable>
     </value-attributes>
-    <on-ambari-upgrade add="false"/>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.expand.map.type</name>
+    <value>true</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.metric.name.separator</name>
+    <value>.</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
   </property>
+
 </configuration>