|
@@ -6,9 +6,9 @@
|
|
|
* to you under the Apache License, Version 2.0 (the
|
|
|
* "License"); you may not use this file except in compliance
|
|
|
* with the License. You may obtain a copy of the License at
|
|
|
- *
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
+ * <p/>
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ * <p/>
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
|
|
|
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.PreparedStatement;
|
|
@@ -56,50 +56,50 @@ public class PhoenixTransactSQL {
|
|
|
|
|
|
public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
|
|
|
"CREATE TABLE IF NOT EXISTS %s " +
|
|
|
- "(METRIC_NAME VARCHAR, " +
|
|
|
- "HOSTNAME VARCHAR, " +
|
|
|
- "APP_ID VARCHAR, " +
|
|
|
- "INSTANCE_ID VARCHAR, " +
|
|
|
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
|
|
|
- "UNITS CHAR(20), " +
|
|
|
- "METRIC_SUM DOUBLE," +
|
|
|
- "METRIC_COUNT UNSIGNED_INT, " +
|
|
|
- "METRIC_MAX DOUBLE," +
|
|
|
- "METRIC_MIN DOUBLE CONSTRAINT pk " +
|
|
|
- "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
|
|
|
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
|
|
|
- " COMPRESSION='%s'";
|
|
|
+ "(METRIC_NAME VARCHAR, " +
|
|
|
+ "HOSTNAME VARCHAR, " +
|
|
|
+ "APP_ID VARCHAR, " +
|
|
|
+ "INSTANCE_ID VARCHAR, " +
|
|
|
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
|
|
|
+ "UNITS CHAR(20), " +
|
|
|
+ "METRIC_SUM DOUBLE," +
|
|
|
+ "METRIC_COUNT UNSIGNED_INT, " +
|
|
|
+ "METRIC_MAX DOUBLE," +
|
|
|
+ "METRIC_MIN DOUBLE CONSTRAINT pk " +
|
|
|
+ "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
|
|
|
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
|
|
|
+ " COMPRESSION='%s'";
|
|
|
|
|
|
public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
|
|
|
"CREATE TABLE IF NOT EXISTS %s " +
|
|
|
- "(METRIC_NAME VARCHAR, " +
|
|
|
- "APP_ID VARCHAR, " +
|
|
|
- "INSTANCE_ID VARCHAR, " +
|
|
|
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
|
|
|
- "UNITS CHAR(20), " +
|
|
|
- "METRIC_SUM DOUBLE, " +
|
|
|
- "HOSTS_COUNT UNSIGNED_INT, " +
|
|
|
- "METRIC_MAX DOUBLE, " +
|
|
|
- "METRIC_MIN DOUBLE " +
|
|
|
- "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
|
|
|
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
|
|
|
- "TTL=%s, COMPRESSION='%s'";
|
|
|
+ "(METRIC_NAME VARCHAR, " +
|
|
|
+ "APP_ID VARCHAR, " +
|
|
|
+ "INSTANCE_ID VARCHAR, " +
|
|
|
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
|
|
|
+ "UNITS CHAR(20), " +
|
|
|
+ "METRIC_SUM DOUBLE, " +
|
|
|
+ "HOSTS_COUNT UNSIGNED_INT, " +
|
|
|
+ "METRIC_MAX DOUBLE, " +
|
|
|
+ "METRIC_MIN DOUBLE " +
|
|
|
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
|
|
|
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
|
|
|
+ "TTL=%s, COMPRESSION='%s'";
|
|
|
|
|
|
// HOSTS_COUNT vs METRIC_COUNT
|
|
|
public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
|
|
|
"CREATE TABLE IF NOT EXISTS %s " +
|
|
|
- "(METRIC_NAME VARCHAR, " +
|
|
|
- "APP_ID VARCHAR, " +
|
|
|
- "INSTANCE_ID VARCHAR, " +
|
|
|
- "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
|
|
|
- "UNITS CHAR(20), " +
|
|
|
- "METRIC_SUM DOUBLE, " +
|
|
|
- "METRIC_COUNT UNSIGNED_INT, " +
|
|
|
- "METRIC_MAX DOUBLE, " +
|
|
|
- "METRIC_MIN DOUBLE " +
|
|
|
- "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
|
|
|
- "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
|
|
|
- "TTL=%s, COMPRESSION='%s'";
|
|
|
+ "(METRIC_NAME VARCHAR, " +
|
|
|
+ "APP_ID VARCHAR, " +
|
|
|
+ "INSTANCE_ID VARCHAR, " +
|
|
|
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
|
|
|
+ "UNITS CHAR(20), " +
|
|
|
+ "METRIC_SUM DOUBLE, " +
|
|
|
+ "METRIC_COUNT UNSIGNED_INT, " +
|
|
|
+ "METRIC_MAX DOUBLE, " +
|
|
|
+ "METRIC_MIN DOUBLE " +
|
|
|
+ "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
|
|
|
+ "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
|
|
|
+ "TTL=%s, COMPRESSION='%s'";
|
|
|
|
|
|
/**
|
|
|
* ALTER table to set new options
|
|
@@ -269,7 +269,7 @@ public class PhoenixTransactSQL {
|
|
|
}
|
|
|
|
|
|
public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection,
|
|
|
- Condition condition) throws SQLException {
|
|
|
+ Condition condition) throws SQLException {
|
|
|
|
|
|
validateConditionIsNotEmpty(condition);
|
|
|
validateRowCountLimit(condition);
|
|
@@ -284,42 +284,25 @@ public class PhoenixTransactSQL {
|
|
|
if (condition.getPrecision() == null) {
|
|
|
long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
|
|
|
long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
|
|
|
- Long timeRange = endTime - startTime;
|
|
|
- if (timeRange > 30 * DAY) {
|
|
|
+ Precision precision = Precision.getPrecision(startTime, endTime);
|
|
|
+ condition.setPrecision(precision);
|
|
|
+ }
|
|
|
+ switch (condition.getPrecision()) {
|
|
|
+ case DAYS:
|
|
|
metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME;
|
|
|
query = GET_METRIC_AGGREGATE_ONLY_SQL;
|
|
|
- condition.setPrecision(Precision.DAYS);
|
|
|
- } else if (timeRange > DAY) {
|
|
|
+ break;
|
|
|
+ case HOURS:
|
|
|
metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
query = GET_METRIC_AGGREGATE_ONLY_SQL;
|
|
|
- condition.setPrecision(Precision.HOURS);
|
|
|
- } else if (timeRange > 10 * HOUR) {
|
|
|
+ break;
|
|
|
+ case MINUTES:
|
|
|
metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
|
|
|
query = GET_METRIC_AGGREGATE_ONLY_SQL;
|
|
|
- condition.setPrecision(Precision.MINUTES);
|
|
|
- } else {
|
|
|
+ break;
|
|
|
+ default:
|
|
|
metricsTable = METRICS_RECORD_TABLE_NAME;
|
|
|
query = GET_METRIC_SQL;
|
|
|
- condition.setPrecision(Precision.SECONDS);
|
|
|
- }
|
|
|
- } else {
|
|
|
- switch (condition.getPrecision()) {
|
|
|
- case DAYS:
|
|
|
- metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME;
|
|
|
- query = GET_METRIC_AGGREGATE_ONLY_SQL;
|
|
|
- break;
|
|
|
- case HOURS:
|
|
|
- metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
- query = GET_METRIC_AGGREGATE_ONLY_SQL;
|
|
|
- break;
|
|
|
- case MINUTES:
|
|
|
- metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
|
|
|
- query = GET_METRIC_AGGREGATE_ONLY_SQL;
|
|
|
- break;
|
|
|
- default:
|
|
|
- metricsTable = METRICS_RECORD_TABLE_NAME;
|
|
|
- query = GET_METRIC_SQL;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
stmtStr = String.format(query,
|
|
@@ -413,19 +396,32 @@ public class PhoenixTransactSQL {
|
|
|
|
|
|
private static void validateRowCountLimit(Condition condition) {
|
|
|
if (condition.getMetricNames() == null
|
|
|
- || condition.getMetricNames().isEmpty() ) {
|
|
|
+ || condition.getMetricNames().isEmpty()) {
|
|
|
//aggregator can use empty metrics query
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
long range = condition.getEndTime() - condition.getStartTime();
|
|
|
- long rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
|
|
|
+ long rowsPerMetric;
|
|
|
|
|
|
+ //Get Precision (passed in or computed) and estimate values returned based on that.
|
|
|
Precision precision = condition.getPrecision();
|
|
|
- // for minutes and seconds we can use the rowsPerMetric computed based on
|
|
|
- // minutes
|
|
|
- if (precision != null && precision == Precision.HOURS) {
|
|
|
- rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1;
|
|
|
+ if (precision == null) {
|
|
|
+ precision = Precision.getPrecision(condition.getStartTime(), condition.getEndTime());
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (precision) {
|
|
|
+ case DAYS:
|
|
|
+ rowsPerMetric = TimeUnit.MILLISECONDS.toDays(range);
|
|
|
+ break;
|
|
|
+ case HOURS:
|
|
|
+ rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range);
|
|
|
+ break;
|
|
|
+ case MINUTES:
|
|
|
+ rowsPerMetric = TimeUnit.MILLISECONDS.toMinutes(range);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ rowsPerMetric = TimeUnit.MILLISECONDS.toSeconds(range)/10; //10 second data in METRIC_AGGREGATE table
|
|
|
}
|
|
|
|
|
|
long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size();
|
|
@@ -437,7 +433,7 @@ public class PhoenixTransactSQL {
|
|
|
}
|
|
|
|
|
|
public static PreparedStatement prepareGetLatestMetricSqlStmt(
|
|
|
- Connection connection, Condition condition) throws SQLException {
|
|
|
+ Connection connection, Condition condition) throws SQLException {
|
|
|
|
|
|
validateConditionIsNotEmpty(condition);
|
|
|
|
|
@@ -473,6 +469,7 @@ public class PhoenixTransactSQL {
|
|
|
|
|
|
return stmt;
|
|
|
}
|
|
|
+
|
|
|
private static PreparedStatement setQueryParameters(PreparedStatement stmt,
|
|
|
Condition condition)
|
|
|
throws SQLException {
|
|
@@ -480,7 +477,7 @@ public class PhoenixTransactSQL {
|
|
|
//For GET_LATEST_METRIC_SQL_SINGLE_HOST parameters should be set 2 times
|
|
|
do {
|
|
|
if (condition.getMetricNames() != null) {
|
|
|
- for (String metricName: condition.getMetricNames()) {
|
|
|
+ for (String metricName : condition.getMetricNames()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Setting pos: " + pos + ", value = " + metricName);
|
|
|
}
|
|
@@ -519,48 +516,36 @@ public class PhoenixTransactSQL {
|
|
|
}
|
|
|
|
|
|
public static PreparedStatement prepareGetAggregateSqlStmt(
|
|
|
- Connection connection, Condition condition) throws SQLException {
|
|
|
+ Connection connection, Condition condition) throws SQLException {
|
|
|
|
|
|
validateConditionIsNotEmpty(condition);
|
|
|
+ validateRowCountLimit(condition);
|
|
|
|
|
|
String metricsAggregateTable;
|
|
|
String queryStmt;
|
|
|
if (condition.getPrecision() == null) {
|
|
|
long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime();
|
|
|
long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime();
|
|
|
- Long timeRange = endTime - startTime;
|
|
|
- if (timeRange > 30 * DAY) {
|
|
|
+ condition.setPrecision(Precision.getPrecision(startTime, endTime));
|
|
|
+ }
|
|
|
+ switch (condition.getPrecision()) {
|
|
|
+ case DAYS:
|
|
|
metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
|
|
|
queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
|
|
|
- condition.setPrecision(Precision.DAYS);
|
|
|
- } else if (timeRange > DAY) {
|
|
|
+ break;
|
|
|
+ case HOURS:
|
|
|
metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
|
|
|
- condition.setPrecision(Precision.HOURS);
|
|
|
- } else {
|
|
|
+ break;
|
|
|
+ //TODO : Include MINUTE case after introducing CLUSTER_AGGREGATOR_MINUTE
|
|
|
+ default:
|
|
|
metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
|
|
|
queryStmt = GET_CLUSTER_AGGREGATE_SQL;
|
|
|
- condition.setPrecision(Precision.SECONDS);
|
|
|
- }
|
|
|
- } else {
|
|
|
- switch (condition.getPrecision()) {
|
|
|
- case DAYS:
|
|
|
- metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
|
|
|
- queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
|
|
|
- break;
|
|
|
- case HOURS:
|
|
|
- metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
- queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
|
|
|
- break;
|
|
|
- default:
|
|
|
- metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
|
|
|
- queryStmt = GET_CLUSTER_AGGREGATE_SQL;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
queryStmt = String.format(queryStmt,
|
|
|
- getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
|
|
|
- metricsAggregateTable);
|
|
|
+ getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA),
|
|
|
+ metricsAggregateTable);
|
|
|
|
|
|
StringBuilder sb = new StringBuilder(queryStmt);
|
|
|
sb.append(" WHERE ");
|
|
@@ -571,6 +556,7 @@ public class PhoenixTransactSQL {
|
|
|
}
|
|
|
|
|
|
String query = sb.toString();
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("SQL => " + query + ", condition => " + condition);
|
|
|
}
|
|
@@ -617,7 +603,7 @@ public class PhoenixTransactSQL {
|
|
|
stmtStr = condition.getStatement();
|
|
|
} else {
|
|
|
stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "",
|
|
|
- METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
|
|
|
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
|
|
|
}
|
|
|
|
|
|
StringBuilder sb = new StringBuilder(stmtStr);
|
|
@@ -639,25 +625,25 @@ public class PhoenixTransactSQL {
|
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
try {
|
|
|
- stmt = connection.prepareStatement(query);
|
|
|
- int pos = 1;
|
|
|
- if (condition.getMetricNames() != null) {
|
|
|
- for (; pos <= condition.getMetricNames().size(); pos++) {
|
|
|
- stmt.setString(pos, condition.getMetricNames().get(pos - 1));
|
|
|
+ stmt = connection.prepareStatement(query);
|
|
|
+ int pos = 1;
|
|
|
+ if (condition.getMetricNames() != null) {
|
|
|
+ for (; pos <= condition.getMetricNames().size(); pos++) {
|
|
|
+ stmt.setString(pos, condition.getMetricNames().get(pos - 1));
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- if (condition.getAppId() != null) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
|
|
|
+ if (condition.getAppId() != null) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
|
|
|
+ }
|
|
|
+ stmt.setString(pos++, condition.getAppId());
|
|
|
+ }
|
|
|
+ if (condition.getInstanceId() != null) {
|
|
|
+ stmt.setString(pos, condition.getInstanceId());
|
|
|
}
|
|
|
- stmt.setString(pos++, condition.getAppId());
|
|
|
- }
|
|
|
- if (condition.getInstanceId() != null) {
|
|
|
- stmt.setString(pos, condition.getInstanceId());
|
|
|
- }
|
|
|
} catch (SQLException e) {
|
|
|
if (stmt != null) {
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
throw e;
|
|
|
}
|