|
@@ -18,9 +18,8 @@
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
|
|
|
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
|
|
@@ -31,10 +30,13 @@ import java.io.IOException;
|
|
|
import java.sql.ResultSet;
|
|
|
import java.sql.SQLException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
|
|
@@ -50,6 +52,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
private final TimelineMetricAppAggregator appAggregator;
|
|
|
// 1 minute client side buffering adjustment
|
|
|
private final Long serverTimeShiftAdjustment;
|
|
|
+ private final boolean interpolationEnabled;
|
|
|
|
|
|
|
|
|
public TimelineMetricClusterAggregatorSecond(String aggregatorName,
|
|
@@ -71,13 +74,16 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf);
|
|
|
this.timeSliceIntervalMillis = timeSliceInterval;
|
|
|
this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
|
|
|
+ this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException {
|
|
|
// Account for time shift due to client side buffering by shifting the
|
|
|
// timestamps with the difference between server time and series start time
|
|
|
- List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime);
|
|
|
+ // Also, we do not want to look at the shift time period from the end as well since we can interpolate those points
|
|
|
+ // that come earlier than the expected, during the next run.
|
|
|
+ List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment);
|
|
|
// Initialize app aggregates for host metrics
|
|
|
appAggregator.init();
|
|
|
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
|
|
@@ -99,15 +105,15 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
// Retaining order of the row-key avoids client side merge sort.
|
|
|
condition.addOrderByColumn("METRIC_NAME");
|
|
|
condition.addOrderByColumn("HOSTNAME");
|
|
|
- condition.addOrderByColumn("SERVER_TIME");
|
|
|
condition.addOrderByColumn("APP_ID");
|
|
|
+ condition.addOrderByColumn("SERVER_TIME");
|
|
|
return condition;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Return time slices to normalize the timeseries data.
|
|
|
*/
|
|
|
- private List<Long[]> getTimeSlices(long startTime, long endTime) {
|
|
|
+ protected List<Long[]> getTimeSlices(long startTime, long endTime) {
|
|
|
List<Long[]> timeSlices = new ArrayList<Long[]>();
|
|
|
long sliceStartTime = startTime;
|
|
|
while (sliceStartTime < endTime) {
|
|
@@ -118,7 +124,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
}
|
|
|
|
|
|
private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
|
|
|
- throws SQLException, IOException {
|
|
|
+ throws SQLException, IOException {
|
|
|
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
|
|
|
new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
|
|
|
|
|
@@ -185,8 +191,8 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
|
|
|
- TimelineMetric timelineMetric, List<Long[]> timeSlices) {
|
|
|
+ protected Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
|
|
|
+ TimelineMetric timelineMetric, List<Long[]> timeSlices) {
|
|
|
|
|
|
if (timelineMetric.getMetricValues().isEmpty()) {
|
|
|
return null;
|
|
@@ -202,6 +208,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
timeShift = 0l;
|
|
|
}
|
|
|
|
|
|
+ Map<Long,Double> timeSliceValueMap = new HashMap<>();
|
|
|
for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) {
|
|
|
// TODO: investigate null values - pre filter
|
|
|
if (metric.getValue() == null) {
|
|
@@ -228,13 +235,68 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
|
|
|
Double oldValue = timelineClusterMetricMap.get(clusterMetric);
|
|
|
sum = oldValue + metric.getValue();
|
|
|
}
|
|
|
- timelineClusterMetricMap.put(clusterMetric, (sum / count));
|
|
|
+ double metricValue = sum / count;
|
|
|
+ timelineClusterMetricMap.put(clusterMetric, metricValue);
|
|
|
+ timeSliceValueMap.put(timestamp, metricValue);
|
|
|
}
|
|
|
}
|
|
|
+ if (interpolationEnabled) {
|
|
|
+ interpolateMissingPeriods(timelineClusterMetricMap, timelineMetric, timeSlices, timeSliceValueMap);
|
|
|
+ }
|
|
|
|
|
|
return timelineClusterMetricMap;
|
|
|
}
|
|
|
|
|
|
+ private void interpolateMissingPeriods(Map<TimelineClusterMetric, Double> timelineClusterMetricMap,
|
|
|
+ TimelineMetric timelineMetric,
|
|
|
+ List<Long[]> timeSlices,
|
|
|
+ Map<Long, Double> timeSliceValueMap) {
|
|
|
+
|
|
|
+
|
|
|
+ for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) {
|
|
|
+ Long[] timeSlice = timeSlices.get(sliceNum);
|
|
|
+
|
|
|
+ if (!timeSliceValueMap.containsKey(timeSlice[1])) {
|
|
|
+ LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1]));
|
|
|
+
|
|
|
+ Double lastSeenValue = null;
|
|
|
+ int index = sliceNum - 1;
|
|
|
+ Long[] prevTimeSlice = null;
|
|
|
+ while (lastSeenValue == null && index >= 0) {
|
|
|
+ prevTimeSlice = timeSlices.get(index--);
|
|
|
+ lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]);
|
|
|
+ }
|
|
|
+
|
|
|
+ Double nextSeenValue = null;
|
|
|
+ index = sliceNum + 1;
|
|
|
+ Long[] nextTimeSlice = null;
|
|
|
+ while ( nextSeenValue == null && index < timeSlices.size()) {
|
|
|
+ nextTimeSlice = timeSlices.get(index++);
|
|
|
+ nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]);
|
|
|
+ }
|
|
|
+
|
|
|
+ Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1],
|
|
|
+ (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue,
|
|
|
+ (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue);
|
|
|
+
|
|
|
+ if (interpolatedValue != null) {
|
|
|
+ TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
|
|
|
+ timelineMetric.getMetricName(),
|
|
|
+ timelineMetric.getAppId(),
|
|
|
+ timelineMetric.getInstanceId(),
|
|
|
+ timeSlice[1],
|
|
|
+ timelineMetric.getType());
|
|
|
+
|
|
|
+ LOG.debug("Interpolated value : " + interpolatedValue);
|
|
|
+ timelineClusterMetricMap.put(clusterMetric, interpolatedValue);
|
|
|
+ } else {
|
|
|
+ LOG.debug("Cannot compute interpolated value, hence skipping.");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Return end of the time slice into which the metric fits.
|
|
|
*/
|