|
@@ -31,11 +31,15 @@ import java.io.IOException;
|
|
|
import java.sql.ResultSet;
|
|
|
import java.sql.SQLException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
|
|
@@ -127,6 +131,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
throws SQLException, IOException {
|
|
|
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
|
|
|
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
|
|
|
+ int numLiveHosts = 0;
|
|
|
|
|
|
TimelineMetric metric = null;
|
|
|
if (rs.next()) {
|
|
@@ -142,18 +147,25 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
metric.addMetricValues(nextMetric.getMetricValues());
|
|
|
} else {
|
|
|
// Process the current metric
|
|
|
- processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
|
|
|
+ int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
|
|
|
+ numLiveHosts = Math.max(numHosts, numLiveHosts);
|
|
|
metric = nextMetric;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
// Process last metric
|
|
|
if (metric != null) {
|
|
|
- processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
|
|
|
+ int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
|
|
|
+ numLiveHosts = Math.max(numHosts, numLiveHosts);
|
|
|
}
|
|
|
|
|
|
// Add app level aggregates to save
|
|
|
aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
|
|
|
+
|
|
|
+ // Add liveHosts metric.
|
|
|
+ long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
|
|
|
+ processLiveHostsMetric(aggregateClusterMetrics, numLiveHosts, timestamp);
|
|
|
+
|
|
|
return aggregateClusterMetrics;
|
|
|
}
|
|
|
|
|
@@ -162,10 +174,11 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
* timeline.metrics.cluster.aggregator.minute.timeslice.interval
|
|
|
* Normalize value by averaging them within the interval
|
|
|
*/
|
|
|
- protected void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
|
|
|
+ protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
|
|
|
TimelineMetric metric, List<Long[]> timeSlices) {
|
|
|
// Create time slices
|
|
|
Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices);
|
|
|
+ int numHosts = 0;
|
|
|
|
|
|
if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
|
|
|
for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
|
|
@@ -185,10 +198,13 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
aggregate.updateMax(avgValue);
|
|
|
aggregate.updateMin(avgValue);
|
|
|
}
|
|
|
+
|
|
|
+ numHosts = aggregate.getNumberOfHosts();
|
|
|
// Update app level aggregates
|
|
|
appAggregator.processTimelineClusterMetric(clusterMetric, metric.getHostName(), avgValue);
|
|
|
}
|
|
|
}
|
|
|
+ return numHosts;
|
|
|
}
|
|
|
|
|
|
protected Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
|
|
@@ -371,4 +387,16 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
return -1l;
|
|
|
}
|
|
|
|
|
|
+ private void processLiveHostsMetric(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics,
|
|
|
+ int numLiveHosts, long timestamp) {
|
|
|
+
|
|
|
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
|
|
|
+ "live_hosts", HOST_APP_ID, null, timestamp, null);
|
|
|
+
|
|
|
+ MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate((double) numLiveHosts,
|
|
|
+ 1, null, (double) numLiveHosts, (double) numLiveHosts);
|
|
|
+
|
|
|
+ aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate);
|
|
|
+ }
|
|
|
+
|
|
|
}
|