|
@@ -17,6 +17,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
|
|
|
|
|
|
+import com.google.common.base.Enums;
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.Precision;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
|
|
@@ -34,6 +36,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
|
|
|
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
|
|
@@ -51,12 +55,18 @@ import java.sql.PreparedStatement;
|
|
|
import java.sql.ResultSet;
|
|
|
import java.sql.SQLException;
|
|
|
import java.sql.Statement;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
|
|
+import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.*;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
|
|
@@ -75,12 +85,16 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
|
|
@@ -92,6 +106,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
|
|
|
|
|
|
/**
|
|
@@ -260,6 +276,14 @@ public class PhoenixHBaseAccessor {
|
|
|
conn = getConnectionRetryingOnException();
|
|
|
stmt = conn.createStatement();
|
|
|
|
|
|
+ // Metadata
|
|
|
+ String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL,
|
|
|
+ encoding, compression);
|
|
|
+ stmt.executeUpdate(metadataSql);
|
|
|
+ String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL,
|
|
|
+ encoding, compression);
|
|
|
+ stmt.executeUpdate(hostedAppSql);
|
|
|
+
|
|
|
// Host level
|
|
|
String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
|
|
|
encoding, precisionTtl, compression);
|
|
@@ -371,8 +395,8 @@ public class PhoenixHBaseAccessor {
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
- public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException {
|
|
|
-
|
|
|
+ public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager,
|
|
|
+ TimelineMetrics metrics) throws SQLException, IOException {
|
|
|
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
|
|
|
if (timelineMetrics == null || timelineMetrics.isEmpty()) {
|
|
|
LOG.debug("Empty metrics insert request.");
|
|
@@ -422,8 +446,16 @@ public class PhoenixHBaseAccessor {
|
|
|
|
|
|
try {
|
|
|
metricRecordStmt.executeUpdate();
|
|
|
+
|
|
|
+ // Write to metadata cache on successful write to store
|
|
|
+ metadataManager.putIfModifiedTimelineMetricMetadata(
|
|
|
+ metadataManager.getTimelineMetricMetadata(metric));
|
|
|
+
|
|
|
+ metadataManager.putIfModifiedHostedAppsMetadata(
|
|
|
+ metric.getHostName(), metric.getAppId());
|
|
|
+
|
|
|
} catch (SQLException sql) {
|
|
|
- LOG.error(sql);
|
|
|
+ LOG.error("Failed on insert records to store.", sql);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -448,6 +480,10 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException {
|
|
|
+ insertMetricRecordsWithMetadata(null, metrics);
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public TimelineMetrics getMetricRecords(
|
|
|
final Condition condition, Map<String, List<Function>> metricFunctions)
|
|
@@ -566,8 +602,7 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
- TimelineMetric metric;
|
|
|
- metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
|
|
|
+ TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
|
|
|
|
|
|
if (condition.isGrouped()) {
|
|
|
metrics.addOrMergeTimelineMetric(metric);
|
|
@@ -1032,4 +1067,212 @@ public class PhoenixHBaseAccessor {
|
|
|
public boolean isSkipBlockCacheForAggregatorsEnabled() {
|
|
|
return skipBlockCacheForAggregatorsEnabled;
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * One time save of metadata when discovering topology during aggregation.
|
|
|
+ * @throws SQLException
|
|
|
+ */
|
|
|
+ public void saveHostAppsMetadata(Map<String, Set<String>> hostedApps) throws SQLException {
|
|
|
+ Connection conn = getConnection();
|
|
|
+ PreparedStatement stmt = null;
|
|
|
+ try {
|
|
|
+ stmt = conn.prepareStatement(UPSERT_HOSTED_APPS_METADATA_SQL);
|
|
|
+ int rowCount = 0;
|
|
|
+
|
|
|
+ for (Map.Entry<String, Set<String>> hostedAppsEntry : hostedApps.entrySet()) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("HostedAppsMetadata: " + hostedAppsEntry);
|
|
|
+ }
|
|
|
+
|
|
|
+ stmt.clearParameters();
|
|
|
+ stmt.setString(1, hostedAppsEntry.getKey());
|
|
|
+ stmt.setString(2, StringUtils.join(hostedAppsEntry.getValue(), ","));
|
|
|
+ try {
|
|
|
+ stmt.executeUpdate();
|
|
|
+ rowCount++;
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ LOG.error("Error saving hosted apps metadata.", sql);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ conn.commit();
|
|
|
+ LOG.info("Saved " + rowCount + " hosted apps metadata records.");
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (stmt != null) {
|
|
|
+ try {
|
|
|
+ stmt.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (conn != null) {
|
|
|
+ try {
|
|
|
+ conn.close();
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Save metdata on updates.
|
|
|
+ * @param metricMetadata @Collection<@TimelineMetricMetadata>
|
|
|
+ * @throws SQLException
|
|
|
+ */
|
|
|
+ public void saveMetricMetadata(Collection<TimelineMetricMetadata> metricMetadata) throws SQLException {
|
|
|
+ if (metricMetadata.isEmpty()) {
|
|
|
+ LOG.info("No metadata records to save.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Connection conn = getConnection();
|
|
|
+ PreparedStatement stmt = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ stmt = conn.prepareStatement(UPSERT_METADATA_SQL);
|
|
|
+ int rowCount = 0;
|
|
|
+
|
|
|
+ for (TimelineMetricMetadata metadata : metricMetadata) {
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("TimelineMetricMetadata: metricName = " + metadata.getMetricName()
|
|
|
+ + ", appId = " + metadata.getAppId()
|
|
|
+ + ", seriesStartTime = " + metadata.getSeriesStartTime()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ stmt.clearParameters();
|
|
|
+ stmt.setString(1, metadata.getMetricName());
|
|
|
+ stmt.setString(2, metadata.getAppId());
|
|
|
+ stmt.setString(3, metadata.getUnits());
|
|
|
+ stmt.setString(4, metadata.getType().name());
|
|
|
+ stmt.setLong(5, metadata.getSeriesStartTime());
|
|
|
+ stmt.setBoolean(6, metadata.isSupportsAggregates());
|
|
|
+
|
|
|
+ try {
|
|
|
+ stmt.executeUpdate();
|
|
|
+ rowCount++;
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ LOG.error("Error saving metadata.", sql);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ conn.commit();
|
|
|
+ LOG.info("Saved " + rowCount + " metadata records.");
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (stmt != null) {
|
|
|
+ try {
|
|
|
+ stmt.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (conn != null) {
|
|
|
+ try {
|
|
|
+ conn.close();
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, Set<String>> getHostedAppsMetadata() throws SQLException {
|
|
|
+ Map<String, Set<String>> hostedAppMap = new HashMap<>();
|
|
|
+ Connection conn = getConnection();
|
|
|
+ PreparedStatement stmt = null;
|
|
|
+ ResultSet rs = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ stmt = conn.prepareStatement(GET_HOSTED_APPS_METADATA_SQL);
|
|
|
+ rs = stmt.executeQuery();
|
|
|
+
|
|
|
+ while (rs.next()) {
|
|
|
+ hostedAppMap.put(rs.getString("HOSTNAME"),
|
|
|
+ new HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ","))));
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (rs != null) {
|
|
|
+ try {
|
|
|
+ rs.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (stmt != null) {
|
|
|
+ try {
|
|
|
+ stmt.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (conn != null) {
|
|
|
+ try {
|
|
|
+ conn.close();
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return hostedAppMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ // No filter criteria support for now.
|
|
|
+ public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getTimelineMetricMetadata() throws SQLException {
|
|
|
+ Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new HashMap<>();
|
|
|
+ Connection conn = getConnection();
|
|
|
+ PreparedStatement stmt = null;
|
|
|
+ ResultSet rs = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ stmt = conn.prepareStatement(GET_METRIC_METADATA_SQL);
|
|
|
+ rs = stmt.executeQuery();
|
|
|
+
|
|
|
+ while (rs.next()) {
|
|
|
+ String metricName = rs.getString("METRIC_NAME");
|
|
|
+ String appId = rs.getString("APP_ID");
|
|
|
+ TimelineMetricMetadata metadata = new TimelineMetricMetadata(
|
|
|
+ metricName,
|
|
|
+ appId,
|
|
|
+ rs.getString("UNITS"),
|
|
|
+ Enums.getIfPresent(MetricType.class, rs.getString("TYPE")).or(MetricType.UNDEFINED),
|
|
|
+ rs.getLong("START_TIME"),
|
|
|
+ rs.getBoolean("SUPPORTS_AGGREGATION")
|
|
|
+ );
|
|
|
+
|
|
|
+ TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId);
|
|
|
+ metadata.setIsPersisted(true); // Always true on retrieval
|
|
|
+ metadataMap.put(key, metadata);
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ if (rs != null) {
|
|
|
+ try {
|
|
|
+ rs.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (stmt != null) {
|
|
|
+ try {
|
|
|
+ stmt.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (conn != null) {
|
|
|
+ try {
|
|
|
+ conn.close();
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return metadataMap;
|
|
|
+ }
|
|
|
}
|