|
@@ -18,7 +18,10 @@
|
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
|
|
|
|
|
|
|
|
|
+import junit.framework.Assert;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
|
|
@@ -37,8 +40,10 @@ import java.sql.PreparedStatement;
|
|
|
import java.sql.ResultSet;
|
|
|
import java.sql.SQLException;
|
|
|
import java.sql.Statement;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import static junit.framework.Assert.assertEquals;
|
|
@@ -48,8 +53,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
|
|
|
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
|
|
|
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
|
|
|
|
|
|
public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
|
|
@@ -113,8 +120,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
|
|
|
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
|
|
|
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
|
|
|
|
|
|
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
|
|
|
- (conn, condition);
|
|
|
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
|
|
|
ResultSet rs = pstmt.executeQuery();
|
|
|
|
|
|
int recordCount = 0;
|
|
@@ -144,7 +150,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
|
|
|
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
long ctime = startTime;
|
|
|
- long minute = 60 * 1000;
|
|
|
+ long minute = 60 * 1000 * 2;
|
|
|
|
|
|
/**
|
|
|
* Here we have two nodes with two instances each:
|
|
@@ -153,27 +159,33 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
|
|
|
* instance i2 | 3 | 4 |
|
|
|
*
|
|
|
*/
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
|
|
|
+ // Four 1's at ctime - 100
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
|
|
|
"i1", "disk_free", 1));
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
|
|
|
+ // Four 2's at ctime - 100: different host
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
|
|
|
"i1", "disk_free", 2));
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
|
|
|
+ // Avoid overwrite
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
|
|
|
"i2", "disk_free", 3));
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
|
|
|
"i2", "disk_free", 4));
|
|
|
+
|
|
|
ctime += minute;
|
|
|
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
|
|
|
+ // Four 1's at ctime + 2 min
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
|
|
|
"i1", "disk_free", 1));
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
|
|
|
+ // Four 1's at ctime + 2 min - different host
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
|
|
|
"i1", "disk_free", 3));
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
|
|
|
"i2", "disk_free", 2));
|
|
|
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
|
|
|
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
|
|
|
"i2", "disk_free", 4));
|
|
|
// WHEN
|
|
|
long endTime = ctime + minute;
|
|
|
- boolean success = agg.doWork(startTime, endTime);
|
|
|
+ boolean success = agg.doWork(startTime - 1000, endTime + 1000);
|
|
|
|
|
|
//THEN
|
|
|
Condition condition = new DefaultCondition(null, null, null, null, startTime,
|
|
@@ -182,29 +194,26 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
|
|
|
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
|
|
|
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
|
|
|
|
|
|
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
|
|
|
- (conn, condition);
|
|
|
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
|
|
|
ResultSet rs = pstmt.executeQuery();
|
|
|
|
|
|
int recordCount = 0;
|
|
|
while (rs.next()) {
|
|
|
TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
|
|
|
-// PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
|
|
|
MetricClusterAggregate currentHostAggregate =
|
|
|
readHelper.getMetricClusterAggregateFromResultSet(rs);
|
|
|
|
|
|
if ("disk_free".equals(currentMetric.getMetricName())) {
|
|
|
- System.out.println("OUTPUT: " + currentMetric+" - " +
|
|
|
- ""+currentHostAggregate);
|
|
|
- assertEquals(4, currentHostAggregate.getNumberOfHosts());
|
|
|
- assertEquals(4.0, currentHostAggregate.getMax());
|
|
|
- assertEquals(1.0, currentHostAggregate.getMin());
|
|
|
- assertEquals(10.0, currentHostAggregate.getSum());
|
|
|
+ System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
|
|
|
+ assertEquals(2, currentHostAggregate.getNumberOfHosts());
|
|
|
+ assertEquals(5.0, currentHostAggregate.getSum());
|
|
|
recordCount++;
|
|
|
} else {
|
|
|
fail("Unexpected entry");
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ Assert.assertEquals(8, recordCount);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -244,8 +253,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
|
|
|
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
|
|
|
METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
|
|
|
|
|
|
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
|
|
|
- (conn, condition);
|
|
|
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
|
|
|
ResultSet rs = pstmt.executeQuery();
|
|
|
|
|
|
int recordCount = 0;
|
|
@@ -476,6 +484,82 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
|
|
|
assertEquals(1.0d, currentHostAggregate.getSum());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testClusterAggregateMetricNormalization() throws Exception {
|
|
|
+ TimelineMetricAggregator agg =
|
|
|
+ TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
|
|
|
+ TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
|
|
|
+
|
|
|
+ // Sample data
|
|
|
+ TimelineMetric metric1 = new TimelineMetric();
|
|
|
+ metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
|
|
|
+ metric1.setAppId("resourcemanager");
|
|
|
+ metric1.setHostName("h1");
|
|
|
+ metric1.setStartTime(1431372311811l);
|
|
|
+ metric1.setMetricValues(new HashMap<Long, Double>() {{
|
|
|
+ put(1431372311811l, 1.0);
|
|
|
+ put(1431372321811l, 1.0);
|
|
|
+ put(1431372331811l, 1.0);
|
|
|
+ put(1431372341811l, 1.0);
|
|
|
+ put(1431372351811l, 1.0);
|
|
|
+ put(1431372361811l, 1.0);
|
|
|
+ put(1431372371810l, 1.0);
|
|
|
+ }});
|
|
|
+
|
|
|
+ TimelineMetric metric2 = new TimelineMetric();
|
|
|
+ metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
|
|
|
+ metric2.setAppId("resourcemanager");
|
|
|
+ metric2.setHostName("h1");
|
|
|
+ metric2.setStartTime(1431372381810l);
|
|
|
+ metric2.setMetricValues(new HashMap<Long, Double>() {{
|
|
|
+ put(1431372381810l, 1.0);
|
|
|
+ put(1431372391811l, 1.0);
|
|
|
+ put(1431372401811l, 1.0);
|
|
|
+ put(1431372411811l, 1.0);
|
|
|
+ put(1431372421811l, 1.0);
|
|
|
+ put(1431372431811l, 1.0);
|
|
|
+ put(1431372441810l, 1.0);
|
|
|
+ }});
|
|
|
+
|
|
|
+ TimelineMetrics metrics = new TimelineMetrics();
|
|
|
+ metrics.setMetrics(Collections.singletonList(metric1));
|
|
|
+ insertMetricRecords(conn, metrics, 1431372371810l);
|
|
|
+
|
|
|
+ metrics.setMetrics(Collections.singletonList(metric2));
|
|
|
+ insertMetricRecords(conn, metrics, 1431372441810l);
|
|
|
+
|
|
|
+ long startTime = 1431372055000l;
|
|
|
+ long endTime = 1431372655000l;
|
|
|
+
|
|
|
+ agg.doWork(startTime, endTime);
|
|
|
+
|
|
|
+ Condition condition = new DefaultCondition(null, null, null, null, startTime,
|
|
|
+ endTime, null, null, true);
|
|
|
+ condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
|
|
|
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
|
|
|
+ METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
|
|
|
+
|
|
|
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
|
|
|
+ ResultSet rs = pstmt.executeQuery();
|
|
|
+
|
|
|
+ int recordCount = 0;
|
|
|
+ while (rs.next()) {
|
|
|
+ TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
|
|
|
+ MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
|
|
|
+
|
|
|
+ if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) {
|
|
|
+ assertEquals(1, currentHostAggregate.getNumberOfHosts());
|
|
|
+ assertEquals(1.0, currentHostAggregate.getMax());
|
|
|
+ assertEquals(1.0, currentHostAggregate.getMin());
|
|
|
+ assertEquals(1.0, currentHostAggregate.getSum());
|
|
|
+ recordCount++;
|
|
|
+ } else {
|
|
|
+ fail("Unexpected entry");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals(9, recordCount);
|
|
|
+ }
|
|
|
+
|
|
|
private ResultSet executeQuery(String query) throws SQLException {
|
|
|
Connection conn = getConnection(getUrl());
|
|
|
Statement stmt = conn.createStatement();
|