|
@@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.client.Durability;
|
|
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
|
|
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.Precision;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
|
|
@@ -57,6 +59,7 @@ import java.sql.PreparedStatement;
|
|
|
import java.sql.ResultSet;
|
|
|
import java.sql.SQLException;
|
|
|
import java.sql.Statement;
|
|
|
+import java.sql.Timestamp;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
@@ -86,6 +89,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
|
|
@@ -111,6 +117,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Provides a facade over the Phoenix API to access HBase schema
|
|
@@ -180,6 +188,7 @@ public class PhoenixHBaseAccessor {
|
|
|
this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_TABLES_DURABILITY, "");
|
|
|
|
|
|
tableTTL.put(METRICS_RECORD_TABLE_NAME, metricsConf.get(PRECISION_TABLE_TTL, String.valueOf(1 * 86400))); // 1 day
|
|
|
+ tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.get(CONTAINER_METRICS_TTL, String.valueOf(30 * 86400))); // 30 days
|
|
|
tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.get(HOST_MINUTE_TABLE_TTL, String.valueOf(7 * 86400))); //7 days
|
|
|
tableTTL.put(METRICS_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.get(HOST_HOUR_TABLE_TTL, String.valueOf(30 * 86400))); //30 days
|
|
|
tableTTL.put(METRICS_AGGREGATE_DAILY_TABLE_NAME, metricsConf.get(HOST_DAILY_TABLE_TTL, String.valueOf(365 * 86400))); //1 year
|
|
@@ -271,6 +280,10 @@ public class PhoenixHBaseAccessor {
|
|
|
encoding, compression);
|
|
|
stmt.executeUpdate(hostedAppSql);
|
|
|
|
|
|
+ // Container Metrics
|
|
|
+ stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
|
|
|
+ encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
|
|
|
+
|
|
|
// Host level
|
|
|
String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
|
|
|
encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
|
|
@@ -462,6 +475,71 @@ public class PhoenixHBaseAccessor {
|
|
|
return "";
|
|
|
}
|
|
|
|
|
|
+ public void insertContainerMetrics(List<ContainerMetric> metrics)
|
|
|
+ throws SQLException, IOException {
|
|
|
+ Connection conn = getConnection();
|
|
|
+ PreparedStatement metricRecordStmt = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ metricRecordStmt = conn.prepareStatement(
|
|
|
+ String.format(UPSERT_CONTAINER_METRICS_SQL, CONTAINER_METRICS_TABLE_NAME));
|
|
|
+ for (ContainerMetric metric : metrics) {
|
|
|
+ metricRecordStmt.clearParameters();
|
|
|
+ metricRecordStmt.setString(1, ContainerId.fromString(metric.getContainerId())
|
|
|
+ .getApplicationAttemptId().getApplicationId().toString());
|
|
|
+ metricRecordStmt.setString(2, metric.getContainerId());
|
|
|
+ metricRecordStmt.setTimestamp(3, new Timestamp(metric.getStartTime()));
|
|
|
+ metricRecordStmt.setTimestamp(4, new Timestamp(metric.getFinishTime()));
|
|
|
+ metricRecordStmt.setLong(5, metric.getFinishTime() - metric.getStartTime());
|
|
|
+ metricRecordStmt.setString(6, metric.getHostName());
|
|
|
+ metricRecordStmt.setInt(7, metric.getExitCode());
|
|
|
+ metricRecordStmt.setLong(8, metric.getLocalizationDuration());
|
|
|
+ metricRecordStmt.setLong(9, metric.getLaunchDuration());
|
|
|
+ metricRecordStmt.setDouble(10, (double) metric.getPmemLimit() / 1024);
|
|
|
+ metricRecordStmt.setDouble(11,
|
|
|
+ ((double) metric.getPmemLimit() / 1024) * (metric.getFinishTime()
|
|
|
+ - metric.getStartTime()));
|
|
|
+ metricRecordStmt.setDouble(12, (double) metric.getVmemLimit() / 1024);
|
|
|
+ metricRecordStmt.setDouble(13, (double) metric.getPmemUsedMin() / 1024);
|
|
|
+ metricRecordStmt.setDouble(14, (double) metric.getPmemUsedMax() / 1024);
|
|
|
+ metricRecordStmt.setDouble(15, (double) metric.getPmemUsedAvg() / 1024);
|
|
|
+ metricRecordStmt.setDouble(16, (double) metric.getPmem50Pct() / 1024);
|
|
|
+ metricRecordStmt.setDouble(17, (double) metric.getPmem75Pct() / 1024);
|
|
|
+ metricRecordStmt.setDouble(18, (double) metric.getPmem90Pct() / 1024);
|
|
|
+ metricRecordStmt.setDouble(19, (double) metric.getPmem95Pct()/ 1024);
|
|
|
+ metricRecordStmt.setDouble(20, (double) metric.getPmem99Pct() / 1024);
|
|
|
+ metricRecordStmt.setDouble(21, (double) metric.getPmemLimit() / 1024
|
|
|
+ - (double) metric.getPmemUsedMax() / 1024);
|
|
|
+ metricRecordStmt.setDouble(22, ((double) metric.getPmemLimit() / 1024
|
|
|
+ - (double) metric.getPmemUsedMax() / 1024) * (metric.getFinishTime()
|
|
|
+ - metric.getStartTime()));
|
|
|
+
|
|
|
+ try {
|
|
|
+ metricRecordStmt.executeUpdate();
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ LOG.error("Failed on insert records to store.", sql);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ conn.commit();
|
|
|
+ } finally {
|
|
|
+ if (metricRecordStmt != null) {
|
|
|
+ try {
|
|
|
+ metricRecordStmt.close();
|
|
|
+ } catch (SQLException e) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (conn != null) {
|
|
|
+ try {
|
|
|
+ conn.close();
|
|
|
+ } catch (SQLException sql) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager,
|
|
|
TimelineMetrics metrics) throws SQLException, IOException {
|
|
|
List<TimelineMetric> timelineMetrics = metrics.getMetrics();
|
|
@@ -860,7 +938,7 @@ public class PhoenixHBaseAccessor {
|
|
|
}
|
|
|
if (stmt != null) {
|
|
|
stmt.close();
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|