|
@@ -17,7 +17,6 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
|
|
|
|
|
|
-import com.google.common.base.Enums;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -66,7 +65,6 @@ import java.util.TreeMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
-import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.*;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
|
|
@@ -166,51 +164,12 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
|
|
|
private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
|
|
|
- throws SQLException, IOException {
|
|
|
+ throws SQLException, IOException {
|
|
|
TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs);
|
|
|
metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
|
|
|
return metric;
|
|
|
}
|
|
|
|
|
|
- public static SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(
|
|
|
- ResultSet rs, Function f) throws SQLException, IOException {
|
|
|
-
|
|
|
- SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
|
|
|
- rs.getString("METRIC_NAME") + f.getSuffix(),
|
|
|
- rs.getString("APP_ID"),
|
|
|
- rs.getString("INSTANCE_ID"),
|
|
|
- rs.getString("HOSTNAME"),
|
|
|
- rs.getLong("SERVER_TIME"),
|
|
|
- rs.getLong("SERVER_TIME"),
|
|
|
- rs.getString("UNITS")
|
|
|
- );
|
|
|
-
|
|
|
- // get functions for metricnames
|
|
|
-
|
|
|
- double value;
|
|
|
- switch(f.getReadFunction()){
|
|
|
- case AVG:
|
|
|
- value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
|
|
|
- break;
|
|
|
- case MIN:
|
|
|
- value = rs.getDouble("METRIC_MIN");
|
|
|
- break;
|
|
|
- case MAX:
|
|
|
- value = rs.getDouble("METRIC_MAX");
|
|
|
- break;
|
|
|
- case SUM:
|
|
|
- value = rs.getDouble("METRIC_SUM");
|
|
|
- break;
|
|
|
- default:
|
|
|
- value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
|
|
|
-
|
|
|
- return metric;
|
|
|
- }
|
|
|
-
|
|
|
private static TreeMap<Long, Double> readLastMetricValueFromJSON(String json)
|
|
|
throws IOException {
|
|
|
TreeMap<Long, Double> values = readMetricFromJSON(json);
|
|
@@ -436,7 +395,7 @@ public class PhoenixHBaseAccessor {
|
|
|
metricRecordStmt.setString(4, metric.getInstanceId());
|
|
|
metricRecordStmt.setLong(5, currentTime);
|
|
|
metricRecordStmt.setLong(6, metric.getStartTime());
|
|
|
- metricRecordStmt.setString(7, metric.getType());
|
|
|
+ metricRecordStmt.setString(7, metric.getUnits());
|
|
|
metricRecordStmt.setDouble(8, aggregates[0]);
|
|
|
metricRecordStmt.setDouble(9, aggregates[1]);
|
|
|
metricRecordStmt.setDouble(10, aggregates[2]);
|
|
@@ -498,7 +457,7 @@ public class PhoenixHBaseAccessor {
|
|
|
|
|
|
try {
|
|
|
//get latest
|
|
|
- if(condition.isPointInTime()){
|
|
|
+ if (condition.isPointInTime()){
|
|
|
getLatestMetricRecords(condition, conn, metrics);
|
|
|
} else {
|
|
|
if (condition.getEndTime() >= condition.getStartTime()) {
|
|
@@ -580,19 +539,24 @@ public class PhoenixHBaseAccessor {
|
|
|
return metrics;
|
|
|
}
|
|
|
|
|
|
- private void appendMetricFromResultSet(
|
|
|
- TimelineMetrics metrics, Condition condition, Map<String,
|
|
|
- List<Function>> metricFunctions, ResultSet rs)
|
|
|
- throws SQLException, IOException {
|
|
|
- if (condition.getPrecision() == Precision.HOURS
|
|
|
- || condition.getPrecision() == Precision.MINUTES
|
|
|
- || condition.getPrecision() == Precision.DAYS) {
|
|
|
-
|
|
|
- String metricName = rs.getString("METRIC_NAME");
|
|
|
- List<Function> functions = metricFunctions.get(metricName);
|
|
|
+ /**
|
|
|
+ * Apply aggregate function to the result if supplied else get precision
|
|
|
+ * or aggregate data with default function applied.
|
|
|
+ */
|
|
|
+ private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition,
|
|
|
+ Map<String, List<Function>> metricFunctions,
|
|
|
+ ResultSet rs) throws SQLException, IOException {
|
|
|
+ String metricName = rs.getString("METRIC_NAME");
|
|
|
+ List<Function> functions = metricFunctions.get(metricName);
|
|
|
|
|
|
+ // Apply aggregation function if present
|
|
|
+ if (functions != null && !functions.isEmpty()) {
|
|
|
+ if (functions.size() > 1) {
|
|
|
+ throw new IllegalArgumentException("Multiple aggregate functions not supported.");
|
|
|
+ }
|
|
|
for (Function f : functions) {
|
|
|
- SingleValuedTimelineMetric metric = getAggregatedTimelineMetricFromResultSet(rs, f);
|
|
|
+ SingleValuedTimelineMetric metric =
|
|
|
+ TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
|
|
|
|
|
|
if (condition.isGrouped()) {
|
|
|
metrics.addOrMergeTimelineMetric(metric);
|
|
@@ -600,28 +564,35 @@ public class PhoenixHBaseAccessor {
|
|
|
metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- else {
|
|
|
- TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
|
|
|
+ } else {
|
|
|
+ // No aggregation requested
|
|
|
+ if (condition.getPrecision().equals(Precision.SECONDS)) {
|
|
|
+ TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
|
|
|
+ if (condition.isGrouped()) {
|
|
|
+ metrics.addOrMergeTimelineMetric(metric);
|
|
|
+ } else {
|
|
|
+ metrics.getMetrics().add(metric);
|
|
|
+ }
|
|
|
|
|
|
- if (condition.isGrouped()) {
|
|
|
- metrics.addOrMergeTimelineMetric(metric);
|
|
|
} else {
|
|
|
- metrics.getMetrics().add(metric);
|
|
|
+ SingleValuedTimelineMetric metric =
|
|
|
+ TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs,
|
|
|
+ Function.DEFAULT_VALUE_FUNCTION);
|
|
|
+ if (condition.isGrouped()) {
|
|
|
+ metrics.addOrMergeTimelineMetric(metric);
|
|
|
+ } else {
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void getLatestMetricRecords(
|
|
|
- Condition condition, Connection conn, TimelineMetrics metrics)
|
|
|
- throws SQLException, IOException {
|
|
|
+ private void getLatestMetricRecords(Condition condition, Connection conn,
|
|
|
+ TimelineMetrics metrics) throws SQLException, IOException {
|
|
|
|
|
|
validateConditionIsNotEmpty(condition);
|
|
|
|
|
|
- PreparedStatement stmt;
|
|
|
-
|
|
|
- stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn,
|
|
|
- condition);
|
|
|
+ PreparedStatement stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, condition);
|
|
|
ResultSet rs = null;
|
|
|
try {
|
|
|
rs = stmt.executeQuery();
|
|
@@ -1146,7 +1117,7 @@ public class PhoenixHBaseAccessor {
|
|
|
stmt.setString(1, metadata.getMetricName());
|
|
|
stmt.setString(2, metadata.getAppId());
|
|
|
stmt.setString(3, metadata.getUnits());
|
|
|
- stmt.setString(4, metadata.getType().name());
|
|
|
+ stmt.setString(4, metadata.getType());
|
|
|
stmt.setLong(5, metadata.getSeriesStartTime());
|
|
|
stmt.setBoolean(6, metadata.isSupportsAggregates());
|
|
|
|
|
@@ -1239,7 +1210,7 @@ public class PhoenixHBaseAccessor {
|
|
|
metricName,
|
|
|
appId,
|
|
|
rs.getString("UNITS"),
|
|
|
- Enums.getIfPresent(MetricType.class, rs.getString("TYPE")).or(MetricType.UNDEFINED),
|
|
|
+ rs.getString("TYPE"),
|
|
|
rs.getLong("START_TIME"),
|
|
|
rs.getBoolean("SUPPORTS_AGGREGATION")
|
|
|
);
|