|
@@ -18,8 +18,25 @@
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
|
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.sql.ResultSet;
|
|
|
+import java.sql.SQLException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
import org.apache.commons.collections.MapUtils;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.commons.lang.mutable.MutableInt;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
@@ -31,29 +48,13 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
|
|
|
-import java.io.IOException;
|
|
|
-import java.sql.ResultSet;
|
|
|
-import java.sql.SQLException;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
|
|
|
|
|
|
/**
|
|
|
* Aggregates a metric across all hosts in the cluster. Reads metrics from
|
|
|
* the precision table and saves into the aggregate.
|
|
|
*/
|
|
|
public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
|
|
|
- public Long timeSliceIntervalMillis;
|
|
|
+ Long timeSliceIntervalMillis;
|
|
|
private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
|
|
|
// Aggregator to perform app-level aggregates for host metrics
|
|
|
private final TimelineMetricAppAggregator appAggregator;
|
|
@@ -136,7 +137,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
/**
|
|
|
* Return time slices to normalize the timeseries data.
|
|
|
*/
|
|
|
- protected List<Long[]> getTimeSlices(long startTime, long endTime) {
|
|
|
+ List<Long[]> getTimeSlices(long startTime, long endTime) {
|
|
|
List<Long[]> timeSlices = new ArrayList<Long[]>();
|
|
|
long sliceStartTime = startTime;
|
|
|
while (sliceStartTime < endTime) {
|
|
@@ -146,13 +147,13 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
return timeSlices;
|
|
|
}
|
|
|
|
|
|
- private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
|
|
|
- throws SQLException, IOException {
|
|
|
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
|
|
|
+ throws SQLException, IOException {
|
|
|
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
|
|
|
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
|
|
|
- int numLiveHosts = 0;
|
|
|
|
|
|
TimelineMetric metric = null;
|
|
|
+ Map<String, MutableInt> hostedAppCounter = new HashMap<>();
|
|
|
if (rs.next()) {
|
|
|
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
|
|
|
|
|
@@ -167,7 +168,14 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
} else {
|
|
|
// Process the current metric
|
|
|
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
|
|
|
- numLiveHosts = Math.max(numHosts, numLiveHosts);
|
|
|
+ if (!hostedAppCounter.containsKey(metric.getAppId())) {
|
|
|
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
|
|
|
+ } else {
|
|
|
+ int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
|
|
|
+ if (currentHostCount < numHosts) {
|
|
|
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
|
|
|
+ }
|
|
|
+ }
|
|
|
metric = nextMetric;
|
|
|
}
|
|
|
}
|
|
@@ -175,15 +183,22 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
// Process last metric
|
|
|
if (metric != null) {
|
|
|
int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
|
|
|
- numLiveHosts = Math.max(numHosts, numLiveHosts);
|
|
|
+ if (!hostedAppCounter.containsKey(metric.getAppId())) {
|
|
|
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
|
|
|
+ } else {
|
|
|
+ int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue();
|
|
|
+ if (currentHostCount < numHosts) {
|
|
|
+ hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Add app level aggregates to save
|
|
|
aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
|
|
|
|
|
|
- // Add liveHosts metric.
|
|
|
+ // Add liveHosts per AppId metrics.
|
|
|
long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
|
|
|
- processLiveHostsMetric(aggregateClusterMetrics, numLiveHosts, timestamp);
|
|
|
+ processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp);
|
|
|
|
|
|
return aggregateClusterMetrics;
|
|
|
}
|
|
@@ -196,7 +211,6 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
|
|
|
TimelineMetric metric, List<Long[]> timeSlices) {
|
|
|
// Create time slices
|
|
|
-
|
|
|
TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId());
|
|
|
TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
|
|
|
|
|
@@ -209,8 +223,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
int numHosts = 0;
|
|
|
|
|
|
if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
|
|
|
- for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
|
|
|
- clusterMetrics.entrySet()) {
|
|
|
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) {
|
|
|
|
|
|
TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
|
|
|
Double avgValue = clusterMetricEntry.getValue();
|
|
@@ -415,16 +428,21 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
return -1l;
|
|
|
}
|
|
|
|
|
|
- private void processLiveHostsMetric(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
|
|
|
- int numLiveHosts, long timestamp) {
|
|
|
+ /* Add cluster metric for number of hosts that are hosting an appId */
|
|
|
+ private void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
|
|
|
+ Map<String, MutableInt> appHostsCount, long timestamp) {
|
|
|
|
|
|
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
|
|
|
- "live_hosts", HOST_APP_ID, null, timestamp, null);
|
|
|
+ for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) {
|
|
|
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
|
|
|
+ "live_hosts", appHostsEntry.getKey(), null, timestamp, null);
|
|
|
|
|
|
- MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate((double) numLiveHosts,
|
|
|
- 1, null, (double) numLiveHosts, (double) numLiveHosts);
|
|
|
+ Integer numOfHosts = appHostsEntry.getValue().intValue();
|
|
|
|
|
|
- aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
|
|
|
- }
|
|
|
+ MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate(
|
|
|
+ (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts);
|
|
|
|
|
|
+ aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
}
|