|
|
@@ -17,88 +17,18 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
|
|
|
|
|
|
-import com.google.common.collect.Maps;
|
|
|
-import com.google.common.collect.Multimap;
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|
|
-import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
|
-import org.apache.hadoop.hbase.HTableDescriptor;
|
|
|
-import org.apache.hadoop.hbase.client.Durability;
|
|
|
-import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
|
-import org.apache.hadoop.hbase.util.RetryCounter;
|
|
|
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
|
|
|
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
-import org.apache.hadoop.util.ReflectionUtils;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
|
|
|
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
|
|
|
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
-import org.apache.phoenix.exception.PhoenixIOException;
|
|
|
-import org.codehaus.jackson.map.ObjectMapper;
|
|
|
-import org.codehaus.jackson.type.TypeReference;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.sql.Connection;
|
|
|
-import java.sql.PreparedStatement;
|
|
|
-import java.sql.ResultSet;
|
|
|
-import java.sql.SQLException;
|
|
|
-import java.sql.Statement;
|
|
|
-import java.sql.Timestamp;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
-import java.util.TreeMap;
|
|
|
-import java.util.concurrent.ArrayBlockingQueue;
|
|
|
-import java.util.concurrent.BlockingQueue;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
-import java.util.concurrent.ScheduledExecutorService;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
|
|
|
@@ -107,11 +37,19 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
|
|
|
@@ -120,7 +58,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
|
|
|
@@ -139,11 +76,80 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
|
|
|
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.sql.Connection;
|
|
|
+import java.sql.PreparedStatement;
|
|
|
+import java.sql.ResultSet;
|
|
|
+import java.sql.SQLException;
|
|
|
+import java.sql.Statement;
|
|
|
+import java.sql.Timestamp;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.concurrent.ArrayBlockingQueue;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|
|
+import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
|
+import org.apache.hadoop.hbase.HTableDescriptor;
|
|
|
+import org.apache.hadoop.hbase.client.Durability;
|
|
|
+import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
|
+import org.apache.hadoop.hbase.util.RetryCounter;
|
|
|
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalMetricsSource;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
|
|
|
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
|
+import org.apache.phoenix.exception.PhoenixIOException;
|
|
|
+import org.codehaus.jackson.map.ObjectMapper;
|
|
|
+import org.codehaus.jackson.type.TypeReference;
|
|
|
+
|
|
|
+import com.google.common.collect.Maps;
|
|
|
+import com.google.common.collect.Multimap;
|
|
|
|
|
|
|
|
|
/**
|
|
|
@@ -198,16 +204,29 @@ public class PhoenixHBaseAccessor {
|
|
|
|
|
|
private HashMap<String, String> tableTTL = new HashMap<>();
|
|
|
|
|
|
- public PhoenixHBaseAccessor(Configuration hbaseConf,
|
|
|
- Configuration metricsConf){
|
|
|
- this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
|
|
|
+ private final TimelineMetricConfiguration configuration;
|
|
|
+ private InternalMetricsSource rawMetricsSource;
|
|
|
+
|
|
|
+ public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
|
|
|
+ this(TimelineMetricConfiguration.getInstance(), dataSource);
|
|
|
}
|
|
|
|
|
|
- PhoenixHBaseAccessor(Configuration hbaseConf,
|
|
|
- Configuration metricsConf,
|
|
|
+ // Test friendly construction since mock instrumentation is difficult to get
|
|
|
+ // working with hadoop mini cluster
|
|
|
+ PhoenixHBaseAccessor(TimelineMetricConfiguration configuration,
|
|
|
PhoenixConnectionProvider dataSource) {
|
|
|
- this.hbaseConf = hbaseConf;
|
|
|
- this.metricsConf = metricsConf;
|
|
|
+ this.configuration = TimelineMetricConfiguration.getInstance();
|
|
|
+ try {
|
|
|
+ this.hbaseConf = configuration.getHbaseConf();
|
|
|
+ this.metricsConf = configuration.getMetricsConf();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new ExceptionInInitializerError("Cannot initialize configuration.");
|
|
|
+ }
|
|
|
+ if (dataSource == null) {
|
|
|
+ dataSource = new DefaultPhoenixDataSource(hbaseConf);
|
|
|
+ }
|
|
|
+ this.dataSource = dataSource;
|
|
|
+
|
|
|
RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT);
|
|
|
try {
|
|
|
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
|
|
|
@@ -215,7 +234,7 @@ public class PhoenixHBaseAccessor {
|
|
|
LOG.error("Phoenix client jar not found in the classpath.", e);
|
|
|
throw new IllegalStateException(e);
|
|
|
}
|
|
|
- this.dataSource = dataSource;
|
|
|
+
|
|
|
this.retryCounterFactory = new RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
|
|
|
(int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3)));
|
|
|
this.outOfBandTimeAllowance = metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
|
|
|
@@ -249,10 +268,20 @@ public class PhoenixHBaseAccessor {
|
|
|
metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null,
|
|
|
TimelineMetricsAggregatorSink.class);
|
|
|
if (metricSinkClass != null) {
|
|
|
- aggregatorSink =
|
|
|
- ReflectionUtils.newInstance(metricSinkClass, metricsConf);
|
|
|
+ aggregatorSink = ReflectionUtils.newInstance(metricSinkClass, metricsConf);
|
|
|
LOG.info("Initialized aggregator sink class " + metricSinkClass);
|
|
|
}
|
|
|
+
|
|
|
+ ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider();
|
|
|
+ InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider();
|
|
|
+ if (externalSinkProvider != null) {
|
|
|
+ ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
|
|
|
+ int interval = configuration.getExternalSinkInterval(RAW_METRICS);
|
|
|
+ if (interval == -1){
|
|
|
+ interval = cacheCommitInterval;
|
|
|
+ }
|
|
|
+ rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public boolean isInsertCacheEmpty() {
|
|
|
@@ -261,12 +290,15 @@ public class PhoenixHBaseAccessor {
|
|
|
|
|
|
public void commitMetricsFromCache() {
|
|
|
LOG.debug("Clearing metrics cache");
|
|
|
- List<TimelineMetrics> metricsArray = new ArrayList<TimelineMetrics>(insertCache.size());
|
|
|
- while (!insertCache.isEmpty()) {
|
|
|
- metricsArray.add(insertCache.poll());
|
|
|
+ List<TimelineMetrics> metricsList = new ArrayList<TimelineMetrics>(insertCache.size());
|
|
|
+ if (!insertCache.isEmpty()) {
|
|
|
+ insertCache.drainTo(metricsList); // More performant than poll
|
|
|
}
|
|
|
- if (metricsArray.size() > 0) {
|
|
|
- commitMetrics(metricsArray);
|
|
|
+ if (metricsList.size() > 0) {
|
|
|
+ commitMetrics(metricsList);
|
|
|
+ if (rawMetricsSource != null) {
|
|
|
+ rawMetricsSource.publishTimelineMetrics(metricsList);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -367,7 +399,7 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- public static TreeMap<Long, Double> readMetricFromJSON(String json) throws IOException {
|
|
|
+ public static TreeMap<Long, Double> readMetricFromJSON(String json) throws IOException {
|
|
|
return mapper.readValue(json, metricValuesTypeRef);
|
|
|
}
|
|
|
|
|
|
@@ -701,6 +733,9 @@ public class PhoenixHBaseAccessor {
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Insert precision YARN container data.
|
|
|
+ */
|
|
|
public void insertContainerMetrics(List<ContainerMetric> metrics)
|
|
|
throws SQLException, IOException {
|
|
|
Connection conn = getConnection();
|
|
|
@@ -766,6 +801,9 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Insert precision data.
|
|
|
+ */
|
|
|
public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager,
|
|
|
TimelineMetrics metrics, boolean skipCache) throws SQLException, IOException {
|
|
|
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
|
|
|
@@ -1389,9 +1427,7 @@ public class PhoenixHBaseAccessor {
|
|
|
try {
|
|
|
aggregatorSink.saveClusterAggregateRecords(records);
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn(
|
|
|
- "Error writing cluster aggregate records metrics to external sink. "
|
|
|
- + e);
|
|
|
+ LOG.warn("Error writing cluster aggregate records metrics to external sink. ", e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -1402,8 +1438,8 @@ public class PhoenixHBaseAccessor {
|
|
|
*
|
|
|
* @throws SQLException
|
|
|
*/
|
|
|
- public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, MetricHostAggregate> records,
|
|
|
- String tableName) throws SQLException {
|
|
|
+ public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, MetricHostAggregate> records,
|
|
|
+ String tableName) throws SQLException {
|
|
|
if (records == null || records.isEmpty()) {
|
|
|
LOG.debug("Empty aggregate records.");
|
|
|
return;
|