|
@@ -49,6 +49,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
|
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
|
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
|
|
|
|
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
|
|
@@ -60,11 +65,17 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
|
|
|
|
+import org.junit.After;
|
|
import org.junit.AfterClass;
|
|
import org.junit.AfterClass;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
|
|
+import org.junit.Before;
|
|
import org.junit.BeforeClass;
|
|
import org.junit.BeforeClass;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
|
+import com.google.common.collect.ImmutableSet;
|
|
|
|
+import com.google.common.collect.Maps;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Various tests to test writing entities to HBase and reading them back from
|
|
* Various tests to test writing entities to HBase and reading them back from
|
|
* it.
|
|
* it.
|
|
@@ -79,18 +90,344 @@ import org.junit.Test;
|
|
public class TestHBaseTimelineStorage {
|
|
public class TestHBaseTimelineStorage {
|
|
|
|
|
|
private static HBaseTestingUtility util;
|
|
private static HBaseTestingUtility util;
|
|
|
|
+ private HBaseTimelineReaderImpl reader;
|
|
|
|
|
|
@BeforeClass
|
|
@BeforeClass
|
|
public static void setupBeforeClass() throws Exception {
|
|
public static void setupBeforeClass() throws Exception {
|
|
util = new HBaseTestingUtility();
|
|
util = new HBaseTestingUtility();
|
|
util.startMiniCluster();
|
|
util.startMiniCluster();
|
|
createSchema();
|
|
createSchema();
|
|
|
|
+ loadEntities();
|
|
|
|
+ loadApps();
|
|
}
|
|
}
|
|
|
|
|
|
private static void createSchema() throws IOException {
|
|
private static void createSchema() throws IOException {
|
|
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
|
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static void loadApps() throws IOException {
|
|
|
|
+ TimelineEntities te = new TimelineEntities();
|
|
|
|
+ TimelineEntity entity = new TimelineEntity();
|
|
|
|
+ String id = "application_1111111111_2222";
|
|
|
|
+ entity.setId(id);
|
|
|
|
+ entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
|
|
|
+ Long cTime = 1425016501000L;
|
|
|
|
+ Long mTime = 1425026901000L;
|
|
|
|
+ entity.setCreatedTime(cTime);
|
|
|
|
+ entity.setModifiedTime(mTime);
|
|
|
|
+ // add the info map in Timeline Entity
|
|
|
|
+ Map<String, Object> infoMap = new HashMap<String, Object>();
|
|
|
|
+ infoMap.put("infoMapKey1", "infoMapValue1");
|
|
|
|
+ infoMap.put("infoMapKey2", 10);
|
|
|
|
+ entity.addInfo(infoMap);
|
|
|
|
+ // add the isRelatedToEntity info
|
|
|
|
+ String key = "task";
|
|
|
|
+ String value = "is_related_to_entity_id_here";
|
|
|
|
+ Set<String> isRelatedToSet = new HashSet<String>();
|
|
|
|
+ isRelatedToSet.add(value);
|
|
|
|
+ Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
|
|
|
|
+ isRelatedTo.put(key, isRelatedToSet);
|
|
|
|
+ entity.setIsRelatedToEntities(isRelatedTo);
|
|
|
|
+ // add the relatesTo info
|
|
|
|
+ key = "container";
|
|
|
|
+ value = "relates_to_entity_id_here";
|
|
|
|
+ Set<String> relatesToSet = new HashSet<String>();
|
|
|
|
+ relatesToSet.add(value);
|
|
|
|
+ value = "relates_to_entity_id_here_Second";
|
|
|
|
+ relatesToSet.add(value);
|
|
|
|
+ Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
|
|
|
|
+ relatesTo.put(key, relatesToSet);
|
|
|
|
+ entity.setRelatesToEntities(relatesTo);
|
|
|
|
+ // add some config entries
|
|
|
|
+ Map<String, String> conf = new HashMap<String, String>();
|
|
|
|
+ conf.put("config_param1", "value1");
|
|
|
|
+ conf.put("config_param2", "value2");
|
|
|
|
+ conf.put("cfg_param1", "value3");
|
|
|
|
+ entity.addConfigs(conf);
|
|
|
|
+ // add metrics
|
|
|
|
+ Set<TimelineMetric> metrics = new HashSet<>();
|
|
|
|
+ TimelineMetric m1 = new TimelineMetric();
|
|
|
|
+ m1.setId("MAP_SLOT_MILLIS");
|
|
|
|
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
|
|
|
+ long ts = System.currentTimeMillis();
|
|
|
|
+ metricValues.put(ts - 120000, 100000000);
|
|
|
|
+ metricValues.put(ts - 100000, 200000000);
|
|
|
|
+ metricValues.put(ts - 80000, 300000000);
|
|
|
|
+ metricValues.put(ts - 60000, 400000000);
|
|
|
|
+ metricValues.put(ts - 40000, 50000000000L);
|
|
|
|
+ metricValues.put(ts - 20000, 60000000000L);
|
|
|
|
+ m1.setType(Type.TIME_SERIES);
|
|
|
|
+ m1.setValues(metricValues);
|
|
|
|
+ metrics.add(m1);
|
|
|
|
+
|
|
|
|
+ TimelineMetric m12 = new TimelineMetric();
|
|
|
|
+ m12.setId("MAP1_BYTES");
|
|
|
|
+ m12.addValue(ts, 50);
|
|
|
|
+ metrics.add(m12);
|
|
|
|
+ entity.addMetrics(metrics);
|
|
|
|
+ TimelineEvent event = new TimelineEvent();
|
|
|
|
+ event.setId("event1");
|
|
|
|
+ event.setTimestamp(ts - 2000);
|
|
|
|
+ entity.addEvent(event);
|
|
|
|
+ te.addEntity(entity);
|
|
|
|
+
|
|
|
|
+ TimelineEntities te1 = new TimelineEntities();
|
|
|
|
+ TimelineEntity entity1 = new TimelineEntity();
|
|
|
|
+ String id1 = "application_1111111111_3333";
|
|
|
|
+ entity1.setId(id1);
|
|
|
|
+ entity1.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
|
|
|
+ entity1.setCreatedTime(cTime);
|
|
|
|
+ entity1.setModifiedTime(mTime);
|
|
|
|
+
|
|
|
|
+ // add the info map in Timeline Entity
|
|
|
|
+ Map<String, Object> infoMap1 = new HashMap<String, Object>();
|
|
|
|
+ infoMap1.put("infoMapKey1", "infoMapValue1");
|
|
|
|
+ infoMap1.put("infoMapKey2", 10);
|
|
|
|
+ entity1.addInfo(infoMap1);
|
|
|
|
+
|
|
|
|
+ // add the isRelatedToEntity info
|
|
|
|
+ String key1 = "task";
|
|
|
|
+ String value1 = "is_related_to_entity_id_here";
|
|
|
|
+ Set<String> isRelatedToSet1 = new HashSet<String>();
|
|
|
|
+ isRelatedToSet1.add(value1);
|
|
|
|
+ Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
|
|
|
|
+ isRelatedTo1.put(key, isRelatedToSet1);
|
|
|
|
+ entity1.setIsRelatedToEntities(isRelatedTo1);
|
|
|
|
+
|
|
|
|
+ // add the relatesTo info
|
|
|
|
+ key1 = "container";
|
|
|
|
+ value1 = "relates_to_entity_id_here";
|
|
|
|
+ Set<String> relatesToSet1 = new HashSet<String>();
|
|
|
|
+ relatesToSet1.add(value1);
|
|
|
|
+ value1 = "relates_to_entity_id_here_Second";
|
|
|
|
+ relatesToSet1.add(value1);
|
|
|
|
+ Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
|
|
|
|
+ relatesTo1.put(key1, relatesToSet1);
|
|
|
|
+ entity1.setRelatesToEntities(relatesTo1);
|
|
|
|
+
|
|
|
|
+ // add some config entries
|
|
|
|
+ Map<String, String> conf1 = new HashMap<String, String>();
|
|
|
|
+ conf1.put("cfg_param1", "value1");
|
|
|
|
+ conf1.put("cfg_param2", "value2");
|
|
|
|
+ entity1.addConfigs(conf1);
|
|
|
|
+
|
|
|
|
+ // add metrics
|
|
|
|
+ Set<TimelineMetric> metrics1 = new HashSet<>();
|
|
|
|
+ TimelineMetric m2 = new TimelineMetric();
|
|
|
|
+ m2.setId("MAP1_SLOT_MILLIS");
|
|
|
|
+ Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
|
|
|
|
+ long ts1 = System.currentTimeMillis();
|
|
|
|
+ metricValues1.put(ts1 - 120000, 100000000);
|
|
|
|
+ metricValues1.put(ts1 - 100000, 200000000);
|
|
|
|
+ metricValues1.put(ts1 - 80000, 300000000);
|
|
|
|
+ metricValues1.put(ts1 - 60000, 400000000);
|
|
|
|
+ metricValues1.put(ts1 - 40000, 50000000000L);
|
|
|
|
+ metricValues1.put(ts1 - 20000, 60000000000L);
|
|
|
|
+ m2.setType(Type.TIME_SERIES);
|
|
|
|
+ m2.setValues(metricValues1);
|
|
|
|
+ metrics1.add(m2);
|
|
|
|
+ entity1.addMetrics(metrics1);
|
|
|
|
+ te1.addEntity(entity1);
|
|
|
|
+
|
|
|
|
+ TimelineEntities te2 = new TimelineEntities();
|
|
|
|
+ TimelineEntity entity2 = new TimelineEntity();
|
|
|
|
+ String id2 = "application_1111111111_4444";
|
|
|
|
+ entity2.setId(id2);
|
|
|
|
+ entity2.setType(TimelineEntityType.YARN_APPLICATION.toString());
|
|
|
|
+ entity2.setCreatedTime(cTime);
|
|
|
|
+ entity2.setModifiedTime(mTime);
|
|
|
|
+ te2.addEntity(entity2);
|
|
|
|
+ HBaseTimelineWriterImpl hbi = null;
|
|
|
|
+ try {
|
|
|
|
+ hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
|
|
|
|
+ hbi.init(util.getConfiguration());
|
|
|
|
+ hbi.start();
|
|
|
|
+ String cluster = "cluster1";
|
|
|
|
+ String user = "user1";
|
|
|
|
+ String flow = "some_flow_name";
|
|
|
|
+ String flowVersion = "AB7822C10F1111";
|
|
|
|
+ long runid = 1002345678919L;
|
|
|
|
+ String appName = "application_1111111111_2222";
|
|
|
|
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
|
|
|
+ appName = "application_1111111111_3333";
|
|
|
|
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
|
|
|
|
+ appName = "application_1111111111_4444";
|
|
|
|
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te2);
|
|
|
|
+ hbi.stop();
|
|
|
|
+ } finally {
|
|
|
|
+ if (hbi != null) {
|
|
|
|
+ hbi.stop();
|
|
|
|
+ hbi.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static void loadEntities() throws IOException {
|
|
|
|
+ TimelineEntities te = new TimelineEntities();
|
|
|
|
+ TimelineEntity entity = new TimelineEntity();
|
|
|
|
+ String id = "hello";
|
|
|
|
+ String type = "world";
|
|
|
|
+ entity.setId(id);
|
|
|
|
+ entity.setType(type);
|
|
|
|
+ Long cTime = 1425016501000L;
|
|
|
|
+ Long mTime = 1425026901000L;
|
|
|
|
+ entity.setCreatedTime(cTime);
|
|
|
|
+ entity.setModifiedTime(mTime);
|
|
|
|
+ // add the info map in Timeline Entity
|
|
|
|
+ Map<String, Object> infoMap = new HashMap<String, Object>();
|
|
|
|
+ infoMap.put("infoMapKey1", "infoMapValue1");
|
|
|
|
+ infoMap.put("infoMapKey2", 10);
|
|
|
|
+ entity.addInfo(infoMap);
|
|
|
|
+ // add the isRelatedToEntity info
|
|
|
|
+ String key = "task";
|
|
|
|
+ String value = "is_related_to_entity_id_here";
|
|
|
|
+ Set<String> isRelatedToSet = new HashSet<String>();
|
|
|
|
+ isRelatedToSet.add(value);
|
|
|
|
+ Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
|
|
|
|
+ isRelatedTo.put(key, isRelatedToSet);
|
|
|
|
+ entity.setIsRelatedToEntities(isRelatedTo);
|
|
|
|
+
|
|
|
|
+ // add the relatesTo info
|
|
|
|
+ key = "container";
|
|
|
|
+ value = "relates_to_entity_id_here";
|
|
|
|
+ Set<String> relatesToSet = new HashSet<String>();
|
|
|
|
+ relatesToSet.add(value);
|
|
|
|
+ value = "relates_to_entity_id_here_Second";
|
|
|
|
+ relatesToSet.add(value);
|
|
|
|
+ Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
|
|
|
|
+ relatesTo.put(key, relatesToSet);
|
|
|
|
+ entity.setRelatesToEntities(relatesTo);
|
|
|
|
+
|
|
|
|
+ // add some config entries
|
|
|
|
+ Map<String, String> conf = new HashMap<String, String>();
|
|
|
|
+ conf.put("config_param1", "value1");
|
|
|
|
+ conf.put("config_param2", "value2");
|
|
|
|
+ conf.put("cfg_param1", "value3");
|
|
|
|
+ entity.addConfigs(conf);
|
|
|
|
+
|
|
|
|
+ // add metrics
|
|
|
|
+ Set<TimelineMetric> metrics = new HashSet<>();
|
|
|
|
+ TimelineMetric m1 = new TimelineMetric();
|
|
|
|
+ m1.setId("MAP_SLOT_MILLIS");
|
|
|
|
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
|
|
|
|
+ long ts = System.currentTimeMillis();
|
|
|
|
+ metricValues.put(ts - 120000, 100000000);
|
|
|
|
+ metricValues.put(ts - 100000, 200000000);
|
|
|
|
+ metricValues.put(ts - 80000, 300000000);
|
|
|
|
+ metricValues.put(ts - 60000, 400000000);
|
|
|
|
+ metricValues.put(ts - 40000, 50000000000L);
|
|
|
|
+ metricValues.put(ts - 20000, 60000000000L);
|
|
|
|
+ m1.setType(Type.TIME_SERIES);
|
|
|
|
+ m1.setValues(metricValues);
|
|
|
|
+ metrics.add(m1);
|
|
|
|
+
|
|
|
|
+ TimelineMetric m12 = new TimelineMetric();
|
|
|
|
+ m12.setId("MAP1_BYTES");
|
|
|
|
+ m12.addValue(ts, 50);
|
|
|
|
+ metrics.add(m12);
|
|
|
|
+ entity.addMetrics(metrics);
|
|
|
|
+ te.addEntity(entity);
|
|
|
|
+
|
|
|
|
+ TimelineEntity entity1 = new TimelineEntity();
|
|
|
|
+ String id1 = "hello1";
|
|
|
|
+ entity1.setId(id1);
|
|
|
|
+ entity1.setType(type);
|
|
|
|
+ entity1.setCreatedTime(cTime);
|
|
|
|
+ entity1.setModifiedTime(mTime);
|
|
|
|
+
|
|
|
|
+ // add the info map in Timeline Entity
|
|
|
|
+ Map<String, Object> infoMap1 = new HashMap<String, Object>();
|
|
|
|
+ infoMap1.put("infoMapKey1", "infoMapValue1");
|
|
|
|
+ infoMap1.put("infoMapKey2", 10);
|
|
|
|
+ entity1.addInfo(infoMap1);
|
|
|
|
+
|
|
|
|
+ // add the isRelatedToEntity info
|
|
|
|
+ String key1 = "task";
|
|
|
|
+ String value1 = "is_related_to_entity_id_here";
|
|
|
|
+ Set<String> isRelatedToSet1 = new HashSet<String>();
|
|
|
|
+ isRelatedToSet1.add(value1);
|
|
|
|
+ Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
|
|
|
|
+ isRelatedTo1.put(key, isRelatedToSet1);
|
|
|
|
+ entity1.setIsRelatedToEntities(isRelatedTo1);
|
|
|
|
+
|
|
|
|
+ // add the relatesTo info
|
|
|
|
+ key1 = "container";
|
|
|
|
+ value1 = "relates_to_entity_id_here";
|
|
|
|
+ Set<String> relatesToSet1 = new HashSet<String>();
|
|
|
|
+ relatesToSet1.add(value1);
|
|
|
|
+ value1 = "relates_to_entity_id_here_Second";
|
|
|
|
+ relatesToSet1.add(value1);
|
|
|
|
+ Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
|
|
|
|
+ relatesTo1.put(key1, relatesToSet1);
|
|
|
|
+ entity1.setRelatesToEntities(relatesTo1);
|
|
|
|
+
|
|
|
|
+ // add some config entries
|
|
|
|
+ Map<String, String> conf1 = new HashMap<String, String>();
|
|
|
|
+ conf1.put("cfg_param1", "value1");
|
|
|
|
+ conf1.put("cfg_param2", "value2");
|
|
|
|
+ entity1.addConfigs(conf1);
|
|
|
|
+
|
|
|
|
+ // add metrics
|
|
|
|
+ Set<TimelineMetric> metrics1 = new HashSet<>();
|
|
|
|
+ TimelineMetric m2 = new TimelineMetric();
|
|
|
|
+ m2.setId("MAP1_SLOT_MILLIS");
|
|
|
|
+ Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
|
|
|
|
+ long ts1 = System.currentTimeMillis();
|
|
|
|
+ metricValues1.put(ts1 - 120000, 100000000);
|
|
|
|
+ metricValues1.put(ts1 - 100000, 200000000);
|
|
|
|
+ metricValues1.put(ts1 - 80000, 300000000);
|
|
|
|
+ metricValues1.put(ts1 - 60000, 400000000);
|
|
|
|
+ metricValues1.put(ts1 - 40000, 50000000000L);
|
|
|
|
+ metricValues1.put(ts1 - 20000, 60000000000L);
|
|
|
|
+ m2.setType(Type.TIME_SERIES);
|
|
|
|
+ m2.setValues(metricValues1);
|
|
|
|
+ metrics1.add(m2);
|
|
|
|
+ entity1.addMetrics(metrics1);
|
|
|
|
+ te.addEntity(entity1);
|
|
|
|
+
|
|
|
|
+ TimelineEntity entity2 = new TimelineEntity();
|
|
|
|
+ String id2 = "hello2";
|
|
|
|
+ entity2.setId(id2);
|
|
|
|
+ entity2.setType(type);
|
|
|
|
+ entity2.setCreatedTime(cTime);
|
|
|
|
+ entity2.setModifiedTime(mTime);
|
|
|
|
+ te.addEntity(entity2);
|
|
|
|
+ HBaseTimelineWriterImpl hbi = null;
|
|
|
|
+ try {
|
|
|
|
+ hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
|
|
|
|
+ hbi.init(util.getConfiguration());
|
|
|
|
+ hbi.start();
|
|
|
|
+ String cluster = "cluster1";
|
|
|
|
+ String user = "user1";
|
|
|
|
+ String flow = "some_flow_name";
|
|
|
|
+ String flowVersion = "AB7822C10F1111";
|
|
|
|
+ long runid = 1002345678919L;
|
|
|
|
+ String appName = "application_1231111111_1111";
|
|
|
|
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
|
|
|
+ hbi.stop();
|
|
|
|
+ } finally {
|
|
|
|
+ if (hbi != null) {
|
|
|
|
+ hbi.stop();
|
|
|
|
+ hbi.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Before
|
|
|
|
+ public void init() throws Exception {
|
|
|
|
+ reader = new HBaseTimelineReaderImpl();
|
|
|
|
+ reader.init(util.getConfiguration());
|
|
|
|
+ reader.start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @After
|
|
|
|
+ public void stop() throws Exception {
|
|
|
|
+ if (reader != null) {
|
|
|
|
+ reader.stop();
|
|
|
|
+ reader.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) {
|
|
private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) {
|
|
assertEquals(m1.size(), m2.size());
|
|
assertEquals(m1.size(), m2.size());
|
|
for (Map.Entry<Long, Number> entry : m2.entrySet()) {
|
|
for (Map.Entry<Long, Number> entry : m2.entrySet()) {
|
|
@@ -163,15 +500,11 @@ public class TestHBaseTimelineStorage {
|
|
te.addEntity(entity);
|
|
te.addEntity(entity);
|
|
|
|
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
- HBaseTimelineReaderImpl hbr = null;
|
|
|
|
try {
|
|
try {
|
|
Configuration c1 = util.getConfiguration();
|
|
Configuration c1 = util.getConfiguration();
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi.init(c1);
|
|
hbi.init(c1);
|
|
hbi.start();
|
|
hbi.start();
|
|
- hbr = new HBaseTimelineReaderImpl();
|
|
|
|
- hbr.init(c1);
|
|
|
|
- hbr.start();
|
|
|
|
String cluster = "cluster_test_write_app";
|
|
String cluster = "cluster_test_write_app";
|
|
String user = "user1";
|
|
String user = "user1";
|
|
String flow = "some_flow_name";
|
|
String flow = "some_flow_name";
|
|
@@ -256,8 +589,8 @@ public class TestHBaseTimelineStorage {
|
|
matchMetrics(metricValues, metricMap);
|
|
matchMetrics(metricValues, metricMap);
|
|
|
|
|
|
// read the timeline entity using the reader this time
|
|
// read the timeline entity using the reader this time
|
|
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
|
|
|
|
- entity.getType(), entity.getId(),
|
|
|
|
|
|
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appId,
|
|
|
|
+ entity.getType(), entity.getId(), null, null,
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
assertNotNull(e1);
|
|
assertNotNull(e1);
|
|
|
|
|
|
@@ -290,10 +623,6 @@ public class TestHBaseTimelineStorage {
|
|
hbi.stop();
|
|
hbi.stop();
|
|
hbi.close();
|
|
hbi.close();
|
|
}
|
|
}
|
|
- if (hbr != null) {
|
|
|
|
- hbr.stop();
|
|
|
|
- hbr.close();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -362,15 +691,11 @@ public class TestHBaseTimelineStorage {
|
|
te.addEntity(entity);
|
|
te.addEntity(entity);
|
|
|
|
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
- HBaseTimelineReaderImpl hbr = null;
|
|
|
|
try {
|
|
try {
|
|
Configuration c1 = util.getConfiguration();
|
|
Configuration c1 = util.getConfiguration();
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi.init(c1);
|
|
hbi.init(c1);
|
|
hbi.start();
|
|
hbi.start();
|
|
- hbr = new HBaseTimelineReaderImpl();
|
|
|
|
- hbr.init(c1);
|
|
|
|
- hbr.start();
|
|
|
|
String cluster = "cluster_test_write_entity";
|
|
String cluster = "cluster_test_write_entity";
|
|
String user = "user1";
|
|
String user = "user1";
|
|
String flow = "some_flow_name";
|
|
String flow = "some_flow_name";
|
|
@@ -468,12 +793,13 @@ public class TestHBaseTimelineStorage {
|
|
assertEquals(17, colCount);
|
|
assertEquals(17, colCount);
|
|
|
|
|
|
// read the timeline entity using the reader this time
|
|
// read the timeline entity using the reader this time
|
|
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
|
|
|
- entity.getType(), entity.getId(),
|
|
|
|
|
|
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
|
|
|
|
+ entity.getType(), entity.getId(), null, null,
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
- Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
|
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
|
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
|
- null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
|
|
|
|
|
+ null, null, null, null, null, null,
|
|
|
|
+ EnumSet.of(TimelineReader.Field.ALL));
|
|
assertNotNull(e1);
|
|
assertNotNull(e1);
|
|
assertEquals(1, es1.size());
|
|
assertEquals(1, es1.size());
|
|
|
|
|
|
@@ -505,10 +831,6 @@ public class TestHBaseTimelineStorage {
|
|
hbi.stop();
|
|
hbi.stop();
|
|
hbi.close();
|
|
hbi.close();
|
|
}
|
|
}
|
|
- if (hbr != null) {
|
|
|
|
- hbr.stop();
|
|
|
|
- hbr.close();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -559,15 +881,11 @@ public class TestHBaseTimelineStorage {
|
|
entities.addEntity(entity);
|
|
entities.addEntity(entity);
|
|
|
|
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
- HBaseTimelineReaderImpl hbr = null;
|
|
|
|
try {
|
|
try {
|
|
Configuration c1 = util.getConfiguration();
|
|
Configuration c1 = util.getConfiguration();
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi.init(c1);
|
|
hbi.init(c1);
|
|
hbi.start();
|
|
hbi.start();
|
|
- hbr = new HBaseTimelineReaderImpl();
|
|
|
|
- hbr.init(c1);
|
|
|
|
- hbr.start();
|
|
|
|
String cluster = "cluster_test_events";
|
|
String cluster = "cluster_test_events";
|
|
String user = "user2";
|
|
String user = "user2";
|
|
String flow = "other_flow_name";
|
|
String flow = "other_flow_name";
|
|
@@ -612,11 +930,11 @@ public class TestHBaseTimelineStorage {
|
|
}
|
|
}
|
|
|
|
|
|
// read the timeline entity using the reader this time
|
|
// read the timeline entity using the reader this time
|
|
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
|
|
|
- entity.getType(), entity.getId(),
|
|
|
|
|
|
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
|
|
|
|
+ entity.getType(), entity.getId(), null, null,
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
- TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
|
|
|
|
- entity.getType(), entity.getId(),
|
|
|
|
|
|
+ TimelineEntity e2 = reader.getEntity(user, cluster, null, null, appName,
|
|
|
|
+ entity.getType(), entity.getId(), null, null,
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
assertNotNull(e1);
|
|
assertNotNull(e1);
|
|
assertNotNull(e2);
|
|
assertNotNull(e2);
|
|
@@ -641,10 +959,6 @@ public class TestHBaseTimelineStorage {
|
|
hbi.stop();
|
|
hbi.stop();
|
|
hbi.close();
|
|
hbi.close();
|
|
}
|
|
}
|
|
- if (hbr != null) {
|
|
|
|
- hbr.stop();
|
|
|
|
- hbr.close();
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -665,15 +979,11 @@ public class TestHBaseTimelineStorage {
|
|
entities.addEntity(entity);
|
|
entities.addEntity(entity);
|
|
|
|
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
- HBaseTimelineReaderImpl hbr = null;
|
|
|
|
try {
|
|
try {
|
|
Configuration c1 = util.getConfiguration();
|
|
Configuration c1 = util.getConfiguration();
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
hbi.init(c1);
|
|
hbi.init(c1);
|
|
hbi.start();
|
|
hbi.start();
|
|
- hbr = new HBaseTimelineReaderImpl();
|
|
|
|
- hbr.init(c1);
|
|
|
|
- hbr.start();
|
|
|
|
String cluster = "cluster_test_empty_eventkey";
|
|
String cluster = "cluster_test_empty_eventkey";
|
|
String user = "user_emptyeventkey";
|
|
String user = "user_emptyeventkey";
|
|
String flow = "other_flow_name";
|
|
String flow = "other_flow_name";
|
|
@@ -726,12 +1036,13 @@ public class TestHBaseTimelineStorage {
|
|
assertEquals(1, rowCount);
|
|
assertEquals(1, rowCount);
|
|
|
|
|
|
// read the timeline entity using the reader this time
|
|
// read the timeline entity using the reader this time
|
|
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
|
|
|
|
- entity.getType(), entity.getId(),
|
|
|
|
|
|
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
|
|
|
|
+ entity.getType(), entity.getId(), null, null,
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
EnumSet.of(TimelineReader.Field.ALL));
|
|
- Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
|
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
|
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
|
appName, entity.getType(), null, null, null, null, null, null, null,
|
|
- null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
|
|
|
|
|
+ null, null, null, null, null, null,
|
|
|
|
+ EnumSet.of(TimelineReader.Field.ALL));
|
|
assertNotNull(e1);
|
|
assertNotNull(e1);
|
|
assertEquals(1, es1.size());
|
|
assertEquals(1, es1.size());
|
|
|
|
|
|
@@ -748,8 +1059,6 @@ public class TestHBaseTimelineStorage {
|
|
} finally {
|
|
} finally {
|
|
hbi.stop();
|
|
hbi.stop();
|
|
hbi.close();
|
|
hbi.close();
|
|
- hbr.stop();;
|
|
|
|
- hbr.close();
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -816,6 +1125,291 @@ public class TestHBaseTimelineStorage {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadEntities() throws Exception {
|
|
|
|
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
|
|
|
|
+ 1002345678919L, "application_1231111111_1111","world", "hello", null,
|
|
|
|
+ null, EnumSet.of(Field.ALL));
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(3, e1.getConfigs().size());
|
|
|
|
+ assertEquals(1, e1.getIsRelatedToEntities().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
|
|
|
+ null, null, null, null, null, null, null, null, null, null, null, null,
|
|
|
|
+ null, EnumSet.of(Field.ALL));
|
|
|
|
+ assertEquals(3, es1.size());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadEntitiesDefaultView() throws Exception {
|
|
|
|
+ TimelineEntity e1 =
|
|
|
|
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
|
|
|
|
+ "application_1231111111_1111","world", "hello", null, null, null);
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
|
|
|
|
+ e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
|
|
|
|
+ e1.getRelatesToEntities().isEmpty());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
|
|
|
+ null, null, null, null, null, null, null, null, null, null, null, null,
|
|
|
|
+ null, null);
|
|
|
|
+ assertEquals(3, es1.size());
|
|
|
|
+ for (TimelineEntity e : es1) {
|
|
|
|
+ assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
|
|
|
|
+ e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
|
|
|
|
+ e.getRelatesToEntities().isEmpty());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadEntitiesByFields() throws Exception {
|
|
|
|
+ TimelineEntity e1 =
|
|
|
|
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
|
|
|
|
+ "application_1231111111_1111","world", "hello", null, null,
|
|
|
|
+ EnumSet.of(Field.INFO, Field.CONFIGS));
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(3, e1.getConfigs().size());
|
|
|
|
+ assertEquals(0, e1.getIsRelatedToEntities().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
|
|
|
+ null, null, null, null, null, null, null, null, null, null, null, null,
|
|
|
|
+ null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
|
|
|
|
+ assertEquals(3, es1.size());
|
|
|
|
+ int metricsCnt = 0;
|
|
|
|
+ int isRelatedToCnt = 0;
|
|
|
|
+ int infoCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : es1) {
|
|
|
|
+ metricsCnt += entity.getMetrics().size();
|
|
|
|
+ isRelatedToCnt += entity.getIsRelatedToEntities().size();
|
|
|
|
+ infoCnt += entity.getInfo().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(0, infoCnt);
|
|
|
|
+ assertEquals(2, isRelatedToCnt);
|
|
|
|
+ assertEquals(3, metricsCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadEntitiesConfigPrefix() throws Exception {
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
|
|
|
|
+ TimelineEntity e1 =
|
|
|
|
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
|
|
|
|
+ "application_1231111111_1111","world", "hello", list, null, null);
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(1, e1.getConfigs().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
|
|
|
+ null, null, null, null, null, null, null, null, null, null, null,
|
|
|
|
+ list, null, null);
|
|
|
|
+ int cfgCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : es1) {
|
|
|
|
+ cfgCnt += entity.getConfigs().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(3, cfgCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadEntitiesConfigFilterPrefix() throws Exception {
|
|
|
|
+ Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
|
|
|
|
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
|
|
|
+ null, null, null, null, null, null, null, null, confFilters, null, null,
|
|
|
|
+ list, null, null);
|
|
|
|
+ assertEquals(1, entities.size());
|
|
|
|
+ int cfgCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : entities) {
|
|
|
|
+ cfgCnt += entity.getConfigs().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(2, cfgCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadEntitiesMetricPrefix() throws Exception {
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
|
|
|
|
+ TimelineEntity e1 =
|
|
|
|
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
|
|
|
|
+ "application_1231111111_1111","world", "hello", null, list, null);
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(1, e1.getMetrics().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
|
|
|
+ null, null, null, null, null, null, null, null, null, null, null, null,
|
|
|
|
+ list, null);
|
|
|
|
+ int metricCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : es1) {
|
|
|
|
+ metricCnt += entity.getMetrics().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(2, metricCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadEntitiesMetricFilterPrefix() throws Exception {
|
|
|
|
+ Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
|
|
|
|
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
|
|
|
|
+ null, null, null, null, null, null, null, null, null, metricFilters,
|
|
|
|
+ null, null, list, null);
|
|
|
|
+ assertEquals(1, entities.size());
|
|
|
|
+ int metricCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : entities) {
|
|
|
|
+ metricCnt += entity.getMetrics().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(1, metricCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadApps() throws Exception {
|
|
|
|
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
|
|
|
|
+ 1002345678919L, "application_1111111111_2222",
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
|
|
|
|
+ EnumSet.of(Field.ALL));
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(3, e1.getConfigs().size());
|
|
|
|
+ assertEquals(1, e1.getIsRelatedToEntities().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, null,
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
|
|
|
+ null, null, null, null, null, null, null, null, null,
|
|
|
|
+ EnumSet.of(Field.ALL));
|
|
|
|
+ assertEquals(3, es1.size());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadAppsDefaultView() throws Exception {
|
|
|
|
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
|
|
|
|
+ 1002345678919L, "application_1111111111_2222",
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null);
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
|
|
|
|
+ e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
|
|
|
|
+ e1.getRelatesToEntities().isEmpty());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, null,
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
|
|
|
+ null, null, null, null, null, null, null, null, null, null);
|
|
|
|
+ assertEquals(3, es1.size());
|
|
|
|
+ for (TimelineEntity e : es1) {
|
|
|
|
+ assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
|
|
|
|
+ e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
|
|
|
|
+ e.getRelatesToEntities().isEmpty());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadAppsByFields() throws Exception {
|
|
|
|
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
|
|
|
|
+ 1002345678919L, "application_1111111111_2222",
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
|
|
|
|
+ EnumSet.of(Field.INFO, Field.CONFIGS));
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(3, e1.getConfigs().size());
|
|
|
|
+ assertEquals(0, e1.getIsRelatedToEntities().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, null,
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
|
|
|
+ null, null, null, null, null, null, null, null, null,
|
|
|
|
+ EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
|
|
|
|
+ assertEquals(3, es1.size());
|
|
|
|
+ int metricsCnt = 0;
|
|
|
|
+ int isRelatedToCnt = 0;
|
|
|
|
+ int infoCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : es1) {
|
|
|
|
+ metricsCnt += entity.getMetrics().size();
|
|
|
|
+ isRelatedToCnt += entity.getIsRelatedToEntities().size();
|
|
|
|
+ infoCnt += entity.getInfo().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(0, infoCnt);
|
|
|
|
+ assertEquals(2, isRelatedToCnt);
|
|
|
|
+ assertEquals(3, metricsCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadAppsConfigPrefix() throws Exception {
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
|
|
|
|
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
|
|
|
|
+ 1002345678919L, "application_1111111111_2222",
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, null);
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(1, e1.getConfigs().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, null,
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
|
|
|
+ null, null, null, null, null, null, null, list, null, null);
|
|
|
|
+ int cfgCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : es1) {
|
|
|
|
+ cfgCnt += entity.getConfigs().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(3, cfgCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadAppsConfigFilterPrefix() throws Exception {
|
|
|
|
+ Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
|
|
|
|
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, null,
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
|
|
|
+ null, null, null, null, confFilters, null, null, list, null, null);
|
|
|
|
+ assertEquals(1, entities.size());
|
|
|
|
+ int cfgCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : entities) {
|
|
|
|
+ cfgCnt += entity.getConfigs().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(2, cfgCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadAppsMetricPrefix() throws Exception {
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
|
|
|
|
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
|
|
|
|
+ 1002345678919L, "application_1111111111_2222",
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, null);
|
|
|
|
+ assertNotNull(e1);
|
|
|
|
+ assertEquals(1, e1.getMetrics().size());
|
|
|
|
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, null,
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
|
|
|
+ null, null, null, null, null, null, null, null, list, null);
|
|
|
|
+ int metricCnt = 0;
|
|
|
|
+ for (TimelineEntity entity : es1) {
|
|
|
|
+ metricCnt += entity.getMetrics().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(2, metricCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReadAppsMetricFilterPrefix() throws Exception {
|
|
|
|
+ TimelineFilterList list =
|
|
|
|
+ new TimelineFilterList(Operator.OR,
|
|
|
|
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
|
|
|
|
+ Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
|
|
|
|
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
|
|
|
|
+ "some_flow_name", 1002345678919L, null,
|
|
|
|
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
|
|
|
|
+ null, null, null, null, null, metricFilters, null, null, list, null);
|
|
|
|
+ int metricCnt = 0;
|
|
|
|
+ assertEquals(1, entities.size());
|
|
|
|
+ for (TimelineEntity entity : entities) {
|
|
|
|
+ metricCnt += entity.getMetrics().size();
|
|
|
|
+ }
|
|
|
|
+ assertEquals(1, metricCnt);
|
|
|
|
+ }
|
|
|
|
+
|
|
@AfterClass
|
|
@AfterClass
|
|
public static void tearDownAfterClass() throws Exception {
|
|
public static void tearDownAfterClass() throws Exception {
|
|
util.shutdownMiniCluster();
|
|
util.shutdownMiniCluster();
|