|
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
import org.apache.phoenix.exception.SQLExceptionCode;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
import org.codehaus.jackson.type.TypeReference;
|
|
|
+
|
|
|
import java.io.IOException;
|
|
|
import java.sql.Connection;
|
|
|
import java.sql.PreparedStatement;
|
|
@@ -51,7 +52,9 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
|
|
@@ -59,19 +62,21 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
|
|
@@ -85,13 +90,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
*/
|
|
|
public class PhoenixHBaseAccessor {
|
|
|
|
|
|
+ static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
|
|
|
private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
|
|
|
private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
|
|
|
- private final Configuration hbaseConf;
|
|
|
- private final Configuration metricsConf;
|
|
|
- private final RetryCounterFactory retryCounterFactory;
|
|
|
-
|
|
|
- static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
|
|
|
/**
|
|
|
* 4 metrics/min * 60 * 24: Retrieve data for 1 day.
|
|
|
*/
|
|
@@ -99,9 +100,11 @@ public class PhoenixHBaseAccessor {
|
|
|
public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) *
|
|
|
METRICS_PER_MINUTE;
|
|
|
private static ObjectMapper mapper = new ObjectMapper();
|
|
|
-
|
|
|
private static TypeReference<Map<Long, Double>> metricValuesTypeRef =
|
|
|
new TypeReference<Map<Long, Double>>() {};
|
|
|
+ private final Configuration hbaseConf;
|
|
|
+ private final Configuration metricsConf;
|
|
|
+ private final RetryCounterFactory retryCounterFactory;
|
|
|
private final ConnectionProvider dataSource;
|
|
|
|
|
|
public PhoenixHBaseAccessor(Configuration hbaseConf,
|
|
@@ -127,37 +130,6 @@ public class PhoenixHBaseAccessor {
|
|
|
(int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5)));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- private Connection getConnectionRetryingOnException()
|
|
|
- throws SQLException, InterruptedException {
|
|
|
- RetryCounter retryCounter = retryCounterFactory.create();
|
|
|
- while (true) {
|
|
|
- try{
|
|
|
- return getConnection();
|
|
|
- } catch (SQLException e) {
|
|
|
- if(!retryCounter.shouldRetry()){
|
|
|
- LOG.error("HBaseAccessor getConnection failed after "
|
|
|
- + retryCounter.getMaxAttempts() + " attempts");
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- retryCounter.sleepUntilNextRetry();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Get JDBC connection to HBase store. Assumption is that the hbase
|
|
|
- * configuration is present on the classpath and loaded by the caller into
|
|
|
- * the Configuration object.
|
|
|
- * Phoenix already caches the HConnection between the client and HBase
|
|
|
- * cluster.
|
|
|
- *
|
|
|
- * @return @java.sql.Connection
|
|
|
- */
|
|
|
- public Connection getConnection() throws SQLException {
|
|
|
- return dataSource.getConnection();
|
|
|
- }
|
|
|
-
|
|
|
private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
|
|
|
throws SQLException, IOException {
|
|
|
TimelineMetric metric = TIMELINE_METRIC_READ_HELPER
|
|
@@ -218,8 +190,7 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- public static Map<Long, Double> readMetricFromJSON(String json)
|
|
|
- throws IOException {
|
|
|
+ public static Map<Long, Double> readMetricFromJSON(String json) throws IOException {
|
|
|
return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
|
|
|
}
|
|
|
|
|
@@ -247,17 +218,34 @@ public class PhoenixHBaseAccessor {
|
|
|
return metricHostAggregate;
|
|
|
}
|
|
|
|
|
|
- public static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
|
|
|
- throws SQLException {
|
|
|
- MetricClusterAggregate agg = new MetricClusterAggregate();
|
|
|
- agg.setSum(rs.getDouble("METRIC_SUM"));
|
|
|
- agg.setMax(rs.getDouble("METRIC_MAX"));
|
|
|
- agg.setMin(rs.getDouble("METRIC_MIN"));
|
|
|
- agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
|
|
|
-
|
|
|
- agg.setDeviation(0.0);
|
|
|
+ private Connection getConnectionRetryingOnException()
|
|
|
+ throws SQLException, InterruptedException {
|
|
|
+ RetryCounter retryCounter = retryCounterFactory.create();
|
|
|
+ while (true) {
|
|
|
+ try{
|
|
|
+ return getConnection();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ if(!retryCounter.shouldRetry()){
|
|
|
+ LOG.error("HBaseAccessor getConnection failed after "
|
|
|
+ + retryCounter.getMaxAttempts() + " attempts");
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ retryCounter.sleepUntilNextRetry();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- return agg;
|
|
|
+ /**
|
|
|
+ * Get JDBC connection to HBase store. Assumption is that the hbase
|
|
|
+ * configuration is present on the classpath and loaded by the caller into
|
|
|
+ * the Configuration object.
|
|
|
+ * Phoenix already caches the HConnection between the client and HBase
|
|
|
+ * cluster.
|
|
|
+ *
|
|
|
+ * @return @java.sql.Connection
|
|
|
+ */
|
|
|
+ public Connection getConnection() throws SQLException {
|
|
|
+ return dataSource.getConnection();
|
|
|
}
|
|
|
|
|
|
protected void initMetricSchema() {
|
|
@@ -269,24 +257,33 @@ public class PhoenixHBaseAccessor {
|
|
|
String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
|
|
|
String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
|
|
|
String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
|
|
|
+ String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");
|
|
|
String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
|
|
|
String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
|
|
|
+ String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000");
|
|
|
|
|
|
try {
|
|
|
LOG.info("Initializing metrics schema...");
|
|
|
conn = getConnectionRetryingOnException();
|
|
|
stmt = conn.createStatement();
|
|
|
|
|
|
+ // Host level
|
|
|
stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL,
|
|
|
encoding, precisionTtl, compression));
|
|
|
- stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL,
|
|
|
- encoding, hostHourTtl, compression));
|
|
|
- stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL,
|
|
|
- encoding, hostMinTtl, compression));
|
|
|
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
|
|
|
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, hostMinTtl, compression));
|
|
|
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
|
|
|
+ METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding, hostHourTtl, compression));
|
|
|
+ stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
|
|
|
+ METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding, hostDailyTtl, compression));
|
|
|
+
|
|
|
+ // Cluster level
|
|
|
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
|
|
|
- encoding, clusterMinTtl, compression));
|
|
|
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, clusterMinTtl, compression));
|
|
|
+ stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
|
|
|
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression));
|
|
|
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
|
|
|
- encoding, clusterHourTtl, compression));
|
|
|
+ METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression));
|
|
|
|
|
|
//alter TTL options to update tables
|
|
|
stmt.executeUpdate(String.format(ALTER_SQL,
|
|
@@ -298,12 +295,18 @@ public class PhoenixHBaseAccessor {
|
|
|
stmt.executeUpdate(String.format(ALTER_SQL,
|
|
|
METRICS_AGGREGATE_HOURLY_TABLE_NAME,
|
|
|
hostHourTtl));
|
|
|
+ stmt.executeUpdate(String.format(ALTER_SQL,
|
|
|
+ METRICS_AGGREGATE_DAILY_TABLE_NAME,
|
|
|
+ hostDailyTtl));
|
|
|
stmt.executeUpdate(String.format(ALTER_SQL,
|
|
|
METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
|
|
|
clusterMinTtl));
|
|
|
stmt.executeUpdate(String.format(ALTER_SQL,
|
|
|
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
|
|
|
clusterHourTtl));
|
|
|
+ stmt.executeUpdate(String.format(ALTER_SQL,
|
|
|
+ METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME,
|
|
|
+ clusterDailyTtl));
|
|
|
|
|
|
conn.commit();
|
|
|
} catch (SQLException sql) {
|
|
@@ -726,9 +729,8 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void saveHostAggregateRecords(Map<TimelineMetric,
|
|
|
- MetricHostAggregate> hostAggregateMap, String phoenixTableName)
|
|
|
- throws SQLException {
|
|
|
+ public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
|
|
|
+ String phoenixTableName) throws SQLException {
|
|
|
|
|
|
if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
|
|
|
LOG.debug("Empty aggregate records.");
|
|
@@ -809,9 +811,8 @@ public class PhoenixHBaseAccessor {
|
|
|
*
|
|
|
* @throws SQLException
|
|
|
*/
|
|
|
- public void saveClusterAggregateRecords(
|
|
|
- Map<TimelineClusterMetric, MetricClusterAggregate> records)
|
|
|
- throws SQLException {
|
|
|
+ public void saveClusterAggregateRecords(Map<TimelineClusterMetric, MetricClusterAggregate> records)
|
|
|
+ throws SQLException {
|
|
|
|
|
|
if (records == null || records.isEmpty()) {
|
|
|
LOG.debug("Empty aggregate records.");
|
|
@@ -819,11 +820,11 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
-
|
|
|
+ String sqlStr = String.format(UPSERT_CLUSTER_AGGREGATE_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
|
|
|
Connection conn = getConnection();
|
|
|
PreparedStatement stmt = null;
|
|
|
try {
|
|
|
- stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
|
|
|
+ stmt = conn.prepareStatement(sqlStr);
|
|
|
int rowCount = 0;
|
|
|
|
|
|
for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
|
|
@@ -892,10 +893,8 @@ public class PhoenixHBaseAccessor {
|
|
|
*
|
|
|
* @throws SQLException
|
|
|
*/
|
|
|
- public void saveClusterAggregateHourlyRecords(
|
|
|
- Map<TimelineClusterMetric, MetricHostAggregate> records,
|
|
|
- String tableName)
|
|
|
- throws SQLException {
|
|
|
+ public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, MetricHostAggregate> records,
|
|
|
+ String tableName) throws SQLException {
|
|
|
if (records == null || records.isEmpty()) {
|
|
|
LOG.debug("Empty aggregate records.");
|
|
|
return;
|
|
@@ -906,12 +905,10 @@ public class PhoenixHBaseAccessor {
|
|
|
Connection conn = getConnection();
|
|
|
PreparedStatement stmt = null;
|
|
|
try {
|
|
|
- stmt = conn.prepareStatement(String.format
|
|
|
- (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
|
|
|
+ stmt = conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
|
|
|
int rowCount = 0;
|
|
|
|
|
|
- for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
|
|
|
- aggregateEntry : records.entrySet()) {
|
|
|
+ for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) {
|
|
|
TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
|
|
|
MetricHostAggregate aggregate = aggregateEntry.getValue();
|
|
|
|
|
@@ -928,7 +925,6 @@ public class PhoenixHBaseAccessor {
|
|
|
stmt.setLong(4, clusterMetric.getTimestamp());
|
|
|
stmt.setString(5, clusterMetric.getType());
|
|
|
stmt.setDouble(6, aggregate.getSum());
|
|
|
-// stmt.setInt(7, aggregate.getNumberOfHosts());
|
|
|
stmt.setLong(7, aggregate.getNumberOfSamples());
|
|
|
stmt.setDouble(8, aggregate.getMax());
|
|
|
stmt.setDouble(9, aggregate.getMin());
|