Explorar o código

AMBARI-9612 AMS : Kafka Metrics - Log flush status metrics do not show up.

Feeding the data into AMS in legacy format
Florian Barca %!s(int64=10) %!d(string=hai) anos
pai
achega
cd3ae1e622

+ 99 - 138
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -51,6 +51,7 @@ import com.yammer.metrics.core.Metric;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricProcessor;
 import com.yammer.metrics.core.MetricProcessor;
 import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Summarizable;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.stats.Snapshot;
 import com.yammer.metrics.stats.Snapshot;
 
 
@@ -69,7 +70,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
 
 
   private boolean initialized = false;
   private boolean initialized = false;
   private boolean running = false;
   private boolean running = false;
-  private Object lock = new Object();
+  private final Object lock = new Object();
   private String collectorUri;
   private String collectorUri;
   private String hostname;
   private String hostname;
   private SocketAddress socketAddress;
   private SocketAddress socketAddress;
@@ -242,33 +243,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
     public void processMeter(MetricName name, Metered meter, Context context) throws Exception {
     public void processMeter(MetricName name, Metered meter, Context context) throws Exception {
       final long currentTimeMillis = System.currentTimeMillis();
       final long currentTimeMillis = System.currentTimeMillis();
       final String sanitizedName = sanitizeName(name);
       final String sanitizedName = sanitizeName(name);
-      final String meterCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric countMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterCountName, meter.count());
 
 
-      final String meterOneMinuteRateName = sanitizedName + ONE_MINUTE_RATE_SUFIX;
-      final TimelineMetric oneMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterOneMinuteRateName, meter.oneMinuteRate());
+      String[] metricNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, meter);
 
 
-      final String meterMeanRateName = sanitizedName + MEAN_RATE_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, meterMeanRateName,
-          meter.meanRate());
-
-      final String meterFiveMinuteRateName = sanitizedName + FIVE_MINUTE_RATE_SUFIX;
-      final TimelineMetric fiveMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFiveMinuteRateName, meter.fiveMinuteRate());
-
-      final String meterFifteenMinuteRateName = sanitizedName + FIFTEEN_MINUTE_RATE_SUFIX;
-      final TimelineMetric fifteenMinuteRateMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          meterFifteenMinuteRateName, meter.fifteenMinuteRate());
-
-      metricsCache.putTimelineMetric(countMetric);
-      metricsCache.putTimelineMetric(oneMinuteRateMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(fiveMinuteRateMetric);
-      metricsCache.putTimelineMetric(fifteenMinuteRateMetric);
-
-      String[] metricNames = new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName,
-          meterFiveMinuteRateName, meterFifteenMinuteRateName };
       populateMetricsList(context, metricNames);
       populateMetricsList(context, metricNames);
     }
     }
 
 
@@ -276,9 +253,10 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
     public void processCounter(MetricName name, Counter counter, Context context) throws Exception {
     public void processCounter(MetricName name, Counter counter, Context context) throws Exception {
       final long currentTimeMillis = System.currentTimeMillis();
       final long currentTimeMillis = System.currentTimeMillis();
       final String sanitizedName = sanitizeName(name);
       final String sanitizedName = sanitizeName(name);
-      final String metricCountName = sanitizedName + COUNT_SUFIX;
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricCountName, counter.count());
-      metricsCache.putTimelineMetric(metric);
+
+      final String metricCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          COUNT_SUFIX, counter.count());
+
       populateMetricsList(context, metricCountName);
       populateMetricsList(context, metricCountName);
     }
     }
 
 
@@ -288,61 +266,20 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
       final Snapshot snapshot = histogram.getSnapshot();
       final Snapshot snapshot = histogram.getSnapshot();
       final String sanitizedName = sanitizeName(name);
       final String sanitizedName = sanitizeName(name);
 
 
-      final String histogramMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMinName,
-          histogram.min());
-
-      final String histogramMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMaxName,
-          histogram.max());
-
-      final String histogramMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMeanName,
-          histogram.mean());
-
-      final String histogramMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramMedianName,
-          snapshot.getMedian());
-
-      final String histogramStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, histogramStdDevName,
-          histogram.stdDev());
-
-      final String histogramSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String histogramNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String histogramNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String histogramNinetyNinethPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinethPercentileName, snapshot.get99thPercentile());
-
-      final String histogramNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          histogramNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { histogramMaxName, histogramMeanName, histogramMedianName, histogramMinName,
-          histogramNinetyEighthPercentileName, histogramNinetyFifthPercentileName,
-          histogramNinetyNinePointNinePercentileName, histogramNinetyNinethPercentileName,
-          histogramSeventyFifthPercentileName, histogramStdDevName };
+      String[] metricHNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, histogram);
+      String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot);
+
+      String[] metricNames = new String[] {
+          metricHNames[0],
+          metricHNames[1],
+          metricSNames[0],
+          metricHNames[2],
+          metricSNames[1],
+          metricSNames[2],
+          metricSNames[3],
+          metricSNames[4],
+          metricSNames[5],
+          metricHNames[3] };
       populateMetricsList(context, metricNames);
       populateMetricsList(context, metricNames);
     }
     }
 
 
@@ -352,57 +289,26 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
       final Snapshot snapshot = timer.getSnapshot();
       final Snapshot snapshot = timer.getSnapshot();
       final String sanitizedName = sanitizeName(name);
       final String sanitizedName = sanitizeName(name);
 
 
-      final String timerMinName = sanitizedName + MIN_SUFIX;
-      final TimelineMetric minMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMinName, timer.min());
-
-      final String timerMaxName = sanitizedName + MAX_SUFIX;
-      final TimelineMetric maxMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMaxName, timer.max());
-
-      final String timerMeanName = sanitizedName + MEAN_SUFIX;
-      final TimelineMetric meanMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMeanName, timer.mean());
-
-      final String timerMedianName = sanitizedName + MEDIAN_SUFIX;
-      final TimelineMetric medianMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerMedianName,
-          snapshot.getMedian());
-
-      final String timerStdDevName = sanitizedName + STD_DEV_SUFIX;
-      final TimelineMetric stdDevMetric = createTimelineMetric(currentTimeMillis, APP_ID, timerStdDevName,
-          timer.stdDev());
-
-      final String timerSeventyFifthPercentileName = sanitizedName + SEVENTY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric seventyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerSeventyFifthPercentileName, snapshot.get75thPercentile());
-
-      final String timerNinetyFifthPercentileName = sanitizedName + NINETY_FIFTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyFifthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyFifthPercentileName, snapshot.get95thPercentile());
-
-      final String timerNinetyEighthPercentileName = sanitizedName + NINETY_EIGHTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyEighthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyEighthPercentileName, snapshot.get98thPercentile());
-
-      final String timerNinetyNinthPercentileName = sanitizedName + NINETY_NINTH_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinthPercentileName, snapshot.get99thPercentile());
-
-      final String timerNinetyNinePointNinePercentileName = sanitizedName + NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX;
-      final TimelineMetric nintyNinthPointNinePercentileMetric = createTimelineMetric(currentTimeMillis, APP_ID,
-          timerNinetyNinePointNinePercentileName, snapshot.get999thPercentile());
-
-      metricsCache.putTimelineMetric(minMetric);
-      metricsCache.putTimelineMetric(maxMetric);
-      metricsCache.putTimelineMetric(meanMetric);
-      metricsCache.putTimelineMetric(medianMetric);
-      metricsCache.putTimelineMetric(stdDevMetric);
-      metricsCache.putTimelineMetric(seventyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyFifthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyEighthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPercentileMetric);
-      metricsCache.putTimelineMetric(nintyNinthPointNinePercentileMetric);
-
-      String[] metricNames = new String[] { timerMaxName, timerMeanName, timerMedianName, timerMinName,
-          timerNinetyEighthPercentileName, timerNinetyFifthPercentileName, timerNinetyNinePointNinePercentileName,
-          timerNinetyNinthPercentileName, timerSeventyFifthPercentileName, timerStdDevName };
+      String[] metricMNames = cacheKafkaMetered(currentTimeMillis, sanitizedName, timer);
+      String[] metricTNames = cacheKafkaSummarizable(currentTimeMillis, sanitizedName, timer);
+      String[] metricSNames = cacheKafkaSnapshot(currentTimeMillis, sanitizedName, snapshot);
+
+      String[] metricNames = new String[] {
+          metricMNames[0],
+          metricMNames[1],
+          metricMNames[2],
+          metricMNames[3],
+          metricMNames[4],
+          metricTNames[0],
+          metricTNames[1],
+          metricSNames[0],
+          metricTNames[2],
+          metricSNames[1],
+          metricSNames[2],
+          metricSNames[3],
+          metricSNames[4],
+          metricSNames[5],
+          metricTNames[3] };
       populateMetricsList(context, metricNames);
       populateMetricsList(context, metricNames);
     }
     }
 
 
@@ -410,12 +316,67 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
     public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {
     public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {
       final long currentTimeMillis = System.currentTimeMillis();
       final long currentTimeMillis = System.currentTimeMillis();
       final String sanitizedName = sanitizeName(name);
       final String sanitizedName = sanitizeName(name);
-      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, sanitizedName,
-          Double.parseDouble(String.valueOf(gauge.value())));
-      metricsCache.putTimelineMetric(metric);
+
+      cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value())));
+
       populateMetricsList(context, sanitizedName);
       populateMetricsList(context, sanitizedName);
     }
     }
 
 
+    private String[] cacheKafkaMetered(long currentTimeMillis, String sanitizedName, Metered meter) {
+      final String meterCountName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          COUNT_SUFIX, meter.count());
+      final String meterOneMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          ONE_MINUTE_RATE_SUFIX, meter.oneMinuteRate());
+      final String meterMeanRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          MEAN_RATE_SUFIX, meter.meanRate());
+      final String meterFiveMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          FIVE_MINUTE_RATE_SUFIX, meter.fiveMinuteRate());
+      final String meterFifteenMinuteRateName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          FIFTEEN_MINUTE_RATE_SUFIX, meter.fifteenMinuteRate());
+
+      return new String[] { meterCountName, meterOneMinuteRateName, meterMeanRateName,
+          meterFiveMinuteRateName, meterFifteenMinuteRateName };
+    }
+
+    private String[] cacheKafkaSummarizable(long currentTimeMillis, String sanitizedName, Summarizable summarizable) {
+      final String minName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          MIN_SUFIX, summarizable.min());
+      final String maxName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          MAX_SUFIX, summarizable.max());
+      final String meanName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          MEAN_SUFIX, summarizable.mean());
+      final String stdDevName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          STD_DEV_SUFIX, summarizable.stdDev());
+
+      return new String[] { maxName, meanName, minName, stdDevName };
+    }
+
+    private String[] cacheKafkaSnapshot(long currentTimeMillis, String sanitizedName, Snapshot snapshot) {
+      final String medianName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          MEDIAN_SUFIX, snapshot.getMedian());
+      final String seventyFifthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          SEVENTY_FIFTH_PERCENTILE_SUFIX, snapshot.get75thPercentile());
+      final String ninetyFifthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_FIFTH_PERCENTILE_SUFIX, snapshot.get95thPercentile());
+      final String ninetyEighthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_EIGHTH_PERCENTILE_SUFIX, snapshot.get98thPercentile());
+      final String ninetyNinthPercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_NINTH_PERCENTILE_SUFIX, snapshot.get99thPercentile());
+      final String ninetyNinePointNinePercentileName = cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName,
+          NINETY_NINE_POINT_NINE_PERCENTILE_SUFIX, snapshot.get999thPercentile());
+
+      return new String[] { medianName,
+          ninetyEighthPercentileName, ninetyFifthPercentileName, ninetyNinePointNinePercentileName,
+          ninetyNinthPercentileName, seventyFifthPercentileName };
+    }
+
+    private String cacheSanitizedTimelineMetric(long currentTimeMillis, String sanitizedName, String suffix, Number metricValue) {
+      final String meterName = sanitizedName + suffix;
+      final TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, meterName, metricValue);
+      metricsCache.putTimelineMetric(metric);
+      return meterName;
+    }
+
     private void populateMetricsList(Context context, String... metricNames) {
     private void populateMetricsList(Context context, String... metricNames) {
       for (String metricName : metricNames) {
       for (String metricName : metricNames) {
         TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);
         TimelineMetric cachedMetric = metricsCache.getTimelineMetric(metricName);