|
@@ -17,6 +17,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
|
|
|
|
|
|
+import com.google.common.collect.Multimap;
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -748,7 +749,7 @@ public class PhoenixHBaseAccessor {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public TimelineMetrics getMetricRecords(
|
|
|
- final Condition condition, Map<String, List<Function>> metricFunctions)
|
|
|
+ final Condition condition, Multimap<String, List<Function>> metricFunctions)
|
|
|
throws SQLException, IOException {
|
|
|
|
|
|
validateConditionIsNotEmpty(condition);
|
|
@@ -847,34 +848,36 @@ public class PhoenixHBaseAccessor {
|
|
|
* or aggregate data with default function applied.
|
|
|
*/
|
|
|
private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition,
|
|
|
- Map<String, List<Function>> metricFunctions,
|
|
|
+ Multimap<String, List<Function>> metricFunctions,
|
|
|
ResultSet rs) throws SQLException, IOException {
|
|
|
String metricName = rs.getString("METRIC_NAME");
|
|
|
- List<Function> functions = findMetricFunctions(metricFunctions, metricName);
|
|
|
+ Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName);
|
|
|
|
|
|
- // Apply aggregation function if present
|
|
|
- if ((functions != null && !functions.isEmpty())) {
|
|
|
- if (functions.size() > 1) {
|
|
|
- throw new IllegalArgumentException("Multiple aggregate functions not supported.");
|
|
|
- }
|
|
|
- for (Function f : functions) {
|
|
|
- if (f.getReadFunction() == Function.ReadFunction.VALUE) {
|
|
|
- getTimelineMetricsFromResultSet(metrics, f, condition, rs);
|
|
|
- } else {
|
|
|
- SingleValuedTimelineMetric metric =
|
|
|
- TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
|
|
|
-
|
|
|
- if (condition.isGrouped()) {
|
|
|
- metrics.addOrMergeTimelineMetric(metric);
|
|
|
+ for (List<Function> functions : functionList) {
|
|
|
+ // Apply aggregation function if present
|
|
|
+ if ((functions != null && !functions.isEmpty())) {
|
|
|
+ if (functions.size() > 1) {
|
|
|
+ throw new IllegalArgumentException("Multiple aggregate functions not supported.");
|
|
|
+ }
|
|
|
+ for (Function f : functions) {
|
|
|
+ if (f.getReadFunction() == Function.ReadFunction.VALUE) {
|
|
|
+ getTimelineMetricsFromResultSet(metrics, f, condition, rs);
|
|
|
} else {
|
|
|
- metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ SingleValuedTimelineMetric metric =
|
|
|
+ TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
|
|
|
+
|
|
|
+ if (condition.isGrouped()) {
|
|
|
+ metrics.addOrMergeTimelineMetric(metric);
|
|
|
+ } else {
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ } else {
|
|
|
+ // No aggregation requested
|
|
|
+ // Execution never goes here, function always contain at least 1 element
|
|
|
+ getTimelineMetricsFromResultSet(metrics, null, condition, rs);
|
|
|
}
|
|
|
- } else {
|
|
|
- // No aggregation requested
|
|
|
- // Execution never goes here, function always contain at least 1 element
|
|
|
- getTimelineMetricsFromResultSet(metrics, null, condition, rs);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -936,7 +939,7 @@ public class PhoenixHBaseAccessor {
|
|
|
* @throws SQLException
|
|
|
*/
|
|
|
public TimelineMetrics getAggregateMetricRecords(final Condition condition,
|
|
|
- Map<String, List<Function>> metricFunctions) throws SQLException {
|
|
|
+ Multimap<String, List<Function>> metricFunctions) throws SQLException {
|
|
|
|
|
|
validateConditionIsNotEmpty(condition);
|
|
|
|
|
@@ -986,34 +989,37 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
|
|
|
private void appendAggregateMetricFromResultSet(TimelineMetrics metrics,
|
|
|
- Condition condition, Map<String, List<Function>> metricFunctions,
|
|
|
+ Condition condition, Multimap<String, List<Function>> metricFunctions,
|
|
|
ResultSet rs) throws SQLException {
|
|
|
|
|
|
String metricName = rs.getString("METRIC_NAME");
|
|
|
- List<Function> functions = findMetricFunctions(metricFunctions, metricName);
|
|
|
+ Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName);
|
|
|
|
|
|
- for (Function aggregateFunction : functions) {
|
|
|
- SingleValuedTimelineMetric metric;
|
|
|
+ for (List<Function> functions : functionList) {
|
|
|
+ for (Function aggregateFunction : functions) {
|
|
|
+ SingleValuedTimelineMetric metric;
|
|
|
|
|
|
- if (condition.getPrecision() == Precision.MINUTES
|
|
|
+ if (condition.getPrecision() == Precision.MINUTES
|
|
|
|| condition.getPrecision() == Precision.HOURS
|
|
|
|| condition.getPrecision() == Precision.DAYS) {
|
|
|
- metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false);
|
|
|
- } else {
|
|
|
- metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, true);
|
|
|
- }
|
|
|
+ metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false);
|
|
|
+ } else {
|
|
|
+ metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, true);
|
|
|
+ }
|
|
|
|
|
|
- if (condition.isGrouped()) {
|
|
|
- metrics.addOrMergeTimelineMetric(metric);
|
|
|
- } else {
|
|
|
- metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ if (condition.isGrouped()) {
|
|
|
+ metrics.addOrMergeTimelineMetric(metric);
|
|
|
+ } else {
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private void getLatestAggregateMetricRecords(Condition condition,
|
|
|
Connection conn, TimelineMetrics metrics,
|
|
|
- Map<String, List<Function>> metricFunctions) throws SQLException {
|
|
|
+ Multimap<String, List<Function>> metricFunctions) throws SQLException {
|
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
SplitByMetricNamesCondition splitCondition =
|
|
@@ -1027,22 +1033,24 @@ public class PhoenixHBaseAccessor {
|
|
|
try {
|
|
|
rs = stmt.executeQuery();
|
|
|
while (rs.next()) {
|
|
|
- List<Function> functions = findMetricFunctions(metricFunctions, metricName);
|
|
|
- if (functions != null) {
|
|
|
- for (Function f : functions) {
|
|
|
- SingleValuedTimelineMetric metric =
|
|
|
- getAggregateTimelineMetricFromResultSet(rs, f, true);
|
|
|
-
|
|
|
- if (condition.isGrouped()) {
|
|
|
- metrics.addOrMergeTimelineMetric(metric);
|
|
|
- } else {
|
|
|
- metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName);
|
|
|
+ for (List<Function> functions : functionList) {
|
|
|
+ if (functions != null) {
|
|
|
+ for (Function f : functions) {
|
|
|
+ SingleValuedTimelineMetric metric =
|
|
|
+ getAggregateTimelineMetricFromResultSet(rs, f, true);
|
|
|
+
|
|
|
+ if (condition.isGrouped()) {
|
|
|
+ metrics.addOrMergeTimelineMetric(metric);
|
|
|
+ } else {
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
+ }
|
|
|
}
|
|
|
+ } else {
|
|
|
+ SingleValuedTimelineMetric metric =
|
|
|
+ getAggregateTimelineMetricFromResultSet(rs, new Function(), true);
|
|
|
+ metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
}
|
|
|
- } else {
|
|
|
- SingleValuedTimelineMetric metric =
|
|
|
- getAggregateTimelineMetricFromResultSet(rs, new Function(), true);
|
|
|
- metrics.getMetrics().add(metric.getTimelineMetric());
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -1108,16 +1116,16 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private List<Function> findMetricFunctions(Map<String, List<Function>> metricFunctions,
|
|
|
+ private Collection<List<Function>> findMetricFunctions(Multimap<String, List<Function>> metricFunctions,
|
|
|
String metricName) {
|
|
|
if (metricFunctions.containsKey(metricName)) {
|
|
|
return metricFunctions.get(metricName);
|
|
|
}
|
|
|
|
|
|
- for (Map.Entry<String, List<Function>> nameToFunctions : metricFunctions.entrySet()) {
|
|
|
- String metricRegEx = nameToFunctions.getKey().replace("%", ".*");
|
|
|
+ for (String metricNameEntry : metricFunctions.keySet()) {
|
|
|
+ String metricRegEx = metricNameEntry.replace("%", ".*");
|
|
|
if (metricName.matches(metricRegEx)) {
|
|
|
- return nameToFunctions.getValue();
|
|
|
+ return metricFunctions.get(metricNameEntry);
|
|
|
}
|
|
|
}
|
|
|
|