|
@@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
|
|
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
|
|
@@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
import org.apache.phoenix.exception.SQLExceptionCode;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
import org.codehaus.jackson.type.TypeReference;
|
|
|
-
|
|
|
import java.io.IOException;
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.PreparedStatement;
|
|
@@ -50,9 +50,7 @@ import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.TreeMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
|
|
@@ -132,23 +130,23 @@ public class PhoenixHBaseAccessor {
|
|
|
|
|
|
private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
|
|
|
throws SQLException, IOException {
|
|
|
- TimelineMetric metric = TIMELINE_METRIC_READ_HELPER
|
|
|
- .getTimelineMetricCommonsFromResultSet(rs);
|
|
|
+ TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs);
|
|
|
metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
|
|
|
-
|
|
|
return metric;
|
|
|
}
|
|
|
|
|
|
- public static TimelineMetric getAggregatedTimelineMetricFromResultSet(
|
|
|
- ResultSet rs, Function f) throws SQLException, IOException {
|
|
|
+ public static SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(
|
|
|
+ ResultSet rs, Function f) throws SQLException, IOException {
|
|
|
|
|
|
- TimelineMetric metric = new TimelineMetric();
|
|
|
- metric.setHostName(rs.getString("HOSTNAME"));
|
|
|
- metric.setAppId(rs.getString("APP_ID"));
|
|
|
- metric.setInstanceId(rs.getString("INSTANCE_ID"));
|
|
|
- metric.setTimestamp(rs.getLong("SERVER_TIME"));
|
|
|
- metric.setStartTime(rs.getLong("SERVER_TIME"));
|
|
|
- metric.setType(rs.getString("UNITS"));
|
|
|
+ SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
|
|
|
+ rs.getString("METRIC_NAME") + f.getSuffix(),
|
|
|
+ rs.getString("APP_ID"),
|
|
|
+ rs.getString("INSTANCE_ID"),
|
|
|
+ rs.getString("HOSTNAME"),
|
|
|
+ rs.getLong("SERVER_TIME"),
|
|
|
+ rs.getLong("SERVER_TIME"),
|
|
|
+ rs.getString("UNITS")
|
|
|
+ );
|
|
|
|
|
|
// get functions for metricnames
|
|
|
|
|
@@ -171,11 +169,8 @@ public class PhoenixHBaseAccessor {
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
|
|
|
+ metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
|
|
|
|
|
|
- Map<Long, Double> valueMap = new TreeMap<Long, Double>();
|
|
|
- valueMap.put(rs.getLong("SERVER_TIME"), value);
|
|
|
- metric.setMetricValues(valueMap);
|
|
|
return metric;
|
|
|
}
|
|
|
|
|
@@ -194,30 +189,6 @@ public class PhoenixHBaseAccessor {
|
|
|
return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
|
|
|
}
|
|
|
|
|
|
- public static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
|
|
|
- throws SQLException, IOException {
|
|
|
- TimelineMetric metric = new TimelineMetric();
|
|
|
- metric.setMetricName(rs.getString("METRIC_NAME"));
|
|
|
- metric.setAppId(rs.getString("APP_ID"));
|
|
|
- metric.setInstanceId(rs.getString("INSTANCE_ID"));
|
|
|
- metric.setHostName(rs.getString("HOSTNAME"));
|
|
|
- metric.setTimestamp(rs.getLong("SERVER_TIME"));
|
|
|
- metric.setType(rs.getString("UNITS"));
|
|
|
- return metric;
|
|
|
- }
|
|
|
-
|
|
|
- public static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
|
|
|
- throws SQLException {
|
|
|
- MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
|
|
|
- metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
|
|
|
- metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
|
|
|
- metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
|
|
|
- metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
|
|
|
-
|
|
|
- metricHostAggregate.setDeviation(0.0);
|
|
|
- return metricHostAggregate;
|
|
|
- }
|
|
|
-
|
|
|
private Connection getConnectionRetryingOnException()
|
|
|
throws SQLException, InterruptedException {
|
|
|
RetryCounter retryCounter = retryCounterFactory.create();
|
|
@@ -465,9 +436,9 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
|
|
|
private void appendMetricFromResultSet(
|
|
|
- TimelineMetrics metrics, Condition condition, Map<String,
|
|
|
- List<Function>> metricFunctions, ResultSet rs)
|
|
|
- throws SQLException, IOException {
|
|
|
+ TimelineMetrics metrics, Condition condition, Map<String,
|
|
|
+ List<Function>> metricFunctions, ResultSet rs)
|
|
|
+ throws SQLException, IOException {
|
|
|
if (condition.getPrecision() == Precision.HOURS
|
|
|
|| condition.getPrecision() == Precision.MINUTES) {
|
|
|
|
|
@@ -475,14 +446,12 @@ public class PhoenixHBaseAccessor {
|
|
|
List<Function> functions = metricFunctions.get(metricName);
|
|
|
|
|
|
for (Function f : functions) {
|
|
|
- TimelineMetric metric;
|
|
|
-
|
|
|
- metric = getAggregatedTimelineMetricFromResultSet(rs, f);
|
|
|
+ SingleValuedTimelineMetric metric = getAggregatedTimelineMetricFromResultSet(rs, f);
|
|
|
|
|
|
if (condition.isGrouped()) {
|
|
|
metrics.addOrMergeTimelineMetric(metric);
|
|
|
} else {
|
|
|
- metrics.getMetrics().add(metric);
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -540,10 +509,8 @@ public class PhoenixHBaseAccessor {
|
|
|
* @return @TimelineMetrics
|
|
|
* @throws SQLException
|
|
|
*/
|
|
|
- public TimelineMetrics getAggregateMetricRecords(
|
|
|
- final Condition condition,
|
|
|
- Map<String, List<Function>> metricFunctions)
|
|
|
- throws SQLException {
|
|
|
+ public TimelineMetrics getAggregateMetricRecords(final Condition condition,
|
|
|
+ Map<String, List<Function>> metricFunctions) throws SQLException {
|
|
|
|
|
|
validateConditionIsNotEmpty(condition);
|
|
|
|
|
@@ -555,14 +522,13 @@ public class PhoenixHBaseAccessor {
|
|
|
try {
|
|
|
//get latest
|
|
|
if(condition.isPointInTime()) {
|
|
|
- stmt = getLatestAggregateMetricRecords(condition, conn, metrics);
|
|
|
+ stmt = getLatestAggregateMetricRecords(condition, conn, metrics, metricFunctions);
|
|
|
} else {
|
|
|
stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
|
|
|
|
|
|
rs = stmt.executeQuery();
|
|
|
while (rs.next()) {
|
|
|
- appendAggregateMetricFromResultSet(metrics, condition,
|
|
|
- metricFunctions, rs);
|
|
|
+ appendAggregateMetricFromResultSet(metrics, condition, metricFunctions, rs);
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -593,34 +559,34 @@ public class PhoenixHBaseAccessor {
|
|
|
return metrics;
|
|
|
}
|
|
|
|
|
|
- private void appendAggregateMetricFromResultSet(
|
|
|
- TimelineMetrics metrics, Condition condition,
|
|
|
- Map<String, List<Function>> metricFunctions, ResultSet rs)
|
|
|
- throws SQLException {
|
|
|
+ private void appendAggregateMetricFromResultSet(TimelineMetrics metrics,
|
|
|
+ Condition condition, Map<String, List<Function>> metricFunctions,
|
|
|
+ ResultSet rs) throws SQLException {
|
|
|
|
|
|
String metricName = rs.getString("METRIC_NAME");
|
|
|
List<Function> functions = metricFunctions.get(metricName);
|
|
|
|
|
|
for (Function aggregateFunction : functions) {
|
|
|
- TimelineMetric metric;
|
|
|
+ SingleValuedTimelineMetric metric;
|
|
|
|
|
|
- if (condition.getPrecision() == Precision.HOURS) {
|
|
|
- metric = getAggregateHoursTimelineMetricFromResultSet(rs, aggregateFunction);
|
|
|
+ if (condition.getPrecision() == Precision.HOURS
|
|
|
+ || condition.getPrecision() == Precision.DAYS) {
|
|
|
+ metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false);
|
|
|
} else {
|
|
|
- metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction);
|
|
|
+ metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, true);
|
|
|
}
|
|
|
|
|
|
if (condition.isGrouped()) {
|
|
|
metrics.addOrMergeTimelineMetric(metric);
|
|
|
} else {
|
|
|
- metrics.getMetrics().add(metric);
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private PreparedStatement getLatestAggregateMetricRecords(
|
|
|
- Condition condition, Connection conn, TimelineMetrics metrics)
|
|
|
- throws SQLException {
|
|
|
+ private PreparedStatement getLatestAggregateMetricRecords(Condition condition,
|
|
|
+ Connection conn, TimelineMetrics metrics,
|
|
|
+ Map<String, List<Function>> metricFunctions) throws SQLException {
|
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
SplitByMetricNamesCondition splitCondition =
|
|
@@ -629,15 +595,28 @@ public class PhoenixHBaseAccessor {
|
|
|
for (String metricName: splitCondition.getOriginalMetricNames()) {
|
|
|
|
|
|
splitCondition.setCurrentMetric(metricName);
|
|
|
- stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn,
|
|
|
- splitCondition);
|
|
|
+ stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, splitCondition);
|
|
|
ResultSet rs = null;
|
|
|
try {
|
|
|
rs = stmt.executeQuery();
|
|
|
while (rs.next()) {
|
|
|
- TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs,
|
|
|
- new Function());
|
|
|
- metrics.getMetrics().add(metric);
|
|
|
+ List<Function> functions = metricFunctions.get(metricName);
|
|
|
+ if (functions != null) {
|
|
|
+ for (Function f : functions) {
|
|
|
+ SingleValuedTimelineMetric metric =
|
|
|
+ getAggregateTimelineMetricFromResultSet(rs, f, true);
|
|
|
+
|
|
|
+ if (condition.isGrouped()) {
|
|
|
+ metrics.addOrMergeTimelineMetric(metric);
|
|
|
+ } else {
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ SingleValuedTimelineMetric metric =
|
|
|
+ getAggregateTimelineMetricFromResultSet(rs, new Function(), true);
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ }
|
|
|
}
|
|
|
} finally {
|
|
|
if (rs != null) {
|
|
@@ -653,54 +632,28 @@ public class PhoenixHBaseAccessor {
|
|
|
return stmt;
|
|
|
}
|
|
|
|
|
|
- private TimelineMetric getAggregateTimelineMetricFromResultSet(
|
|
|
- ResultSet rs, Function f) throws SQLException {
|
|
|
- TimelineMetric metric = new TimelineMetric();
|
|
|
- metric.setAppId(rs.getString("APP_ID"));
|
|
|
- metric.setInstanceId(rs.getString("INSTANCE_ID"));
|
|
|
- metric.setTimestamp(rs.getLong("SERVER_TIME"));
|
|
|
- metric.setStartTime(rs.getLong("SERVER_TIME"));
|
|
|
+ private SingleValuedTimelineMetric getAggregateTimelineMetricFromResultSet(ResultSet rs,
|
|
|
+ Function f, boolean useHostCount) throws SQLException {
|
|
|
|
|
|
- double value;
|
|
|
- switch(f.getReadFunction()){
|
|
|
- case AVG:
|
|
|
- value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT");
|
|
|
- break;
|
|
|
- case MIN:
|
|
|
- value = rs.getDouble("METRIC_MIN");
|
|
|
- break;
|
|
|
- case MAX:
|
|
|
- value = rs.getDouble("METRIC_MAX");
|
|
|
- break;
|
|
|
- case SUM:
|
|
|
- value = rs.getDouble("METRIC_SUM");
|
|
|
- break;
|
|
|
- default:
|
|
|
- value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT");
|
|
|
- break;
|
|
|
+ String countColumnName = "METRIC_COUNT";
|
|
|
+ if (useHostCount) {
|
|
|
+ countColumnName = "HOSTS_COUNT";
|
|
|
}
|
|
|
|
|
|
- metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
|
|
|
-
|
|
|
- Map<Long, Double> valueMap = new TreeMap<Long, Double>();
|
|
|
- valueMap.put(rs.getLong("SERVER_TIME"), value);
|
|
|
- metric.setMetricValues(valueMap);
|
|
|
-
|
|
|
- return metric;
|
|
|
- }
|
|
|
-
|
|
|
- private TimelineMetric getAggregateHoursTimelineMetricFromResultSet(
|
|
|
- ResultSet rs, Function f) throws SQLException {
|
|
|
- TimelineMetric metric = new TimelineMetric();
|
|
|
- metric.setAppId(rs.getString("APP_ID"));
|
|
|
- metric.setInstanceId(rs.getString("INSTANCE_ID"));
|
|
|
- metric.setTimestamp(rs.getLong("SERVER_TIME"));
|
|
|
- metric.setStartTime(rs.getLong("SERVER_TIME"));
|
|
|
+ SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
|
|
|
+ rs.getString("METRIC_NAME") + f.getSuffix(),
|
|
|
+ rs.getString("APP_ID"),
|
|
|
+ rs.getString("INSTANCE_ID"),
|
|
|
+ null,
|
|
|
+ rs.getLong("SERVER_TIME"),
|
|
|
+ rs.getLong("SERVER_TIME"),
|
|
|
+ rs.getString("UNITS")
|
|
|
+ );
|
|
|
|
|
|
double value;
|
|
|
switch(f.getReadFunction()){
|
|
|
case AVG:
|
|
|
- value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
|
|
|
+ value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
|
|
|
break;
|
|
|
case MIN:
|
|
|
value = rs.getDouble("METRIC_MIN");
|
|
@@ -712,15 +665,11 @@ public class PhoenixHBaseAccessor {
|
|
|
value = rs.getDouble("METRIC_SUM");
|
|
|
break;
|
|
|
default:
|
|
|
- value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
|
|
|
+ value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix());
|
|
|
-
|
|
|
- Map<Long, Double> valueMap = new TreeMap<Long, Double>();
|
|
|
- valueMap.put(rs.getLong("SERVER_TIME"), value);
|
|
|
- metric.setMetricValues(valueMap);
|
|
|
+ metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
|
|
|
|
|
|
return metric;
|
|
|
}
|