|
@@ -65,7 +65,7 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
|
|
|
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
|
|
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
|
|
|
|
|
-public class TestTimelineReaderWebServicesFlowRun {
|
|
|
+public class TestTimelineReaderWebServicesHBaseStorage {
|
|
|
private int serverPort;
|
|
|
private TimelineReaderServer server;
|
|
|
private static HBaseTestingUtility util;
|
|
@@ -91,12 +91,13 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
|
|
|
TimelineEntities te = new TimelineEntities();
|
|
|
TimelineEntity entity = new TimelineEntity();
|
|
|
- String id = "flowRunMetrics_test";
|
|
|
+ String id = "application_1111111111_1111";
|
|
|
String type = TimelineEntityType.YARN_APPLICATION.toString();
|
|
|
entity.setId(id);
|
|
|
entity.setType(type);
|
|
|
Long cTime = 1425016501000L;
|
|
|
entity.setCreatedTime(cTime);
|
|
|
+ entity.addConfig("cfg2", "value1");
|
|
|
|
|
|
// add metrics
|
|
|
Set<TimelineMetric> metrics = new HashSet<>();
|
|
@@ -127,17 +128,24 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
Object expVal = "test";
|
|
|
event.addInfo(expKey, expVal);
|
|
|
entity.addEvent(event);
|
|
|
+ TimelineEvent event11 = new TimelineEvent();
|
|
|
+ event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
+ expTs = 1436512802010L;
|
|
|
+ event11.setTimestamp(expTs);
|
|
|
+ entity.addEvent(event11);
|
|
|
+
|
|
|
te.addEntity(entity);
|
|
|
|
|
|
// write another application with same metric to this flow
|
|
|
TimelineEntities te1 = new TimelineEntities();
|
|
|
TimelineEntity entity1 = new TimelineEntity();
|
|
|
- id = "flowRunMetrics_test";
|
|
|
+ id = "application_1111111111_2222";
|
|
|
type = TimelineEntityType.YARN_APPLICATION.toString();
|
|
|
entity1.setId(id);
|
|
|
entity1.setType(type);
|
|
|
cTime = 1425016501000L;
|
|
|
entity1.setCreatedTime(cTime);
|
|
|
+ entity1.addConfig("cfg1", "value1");
|
|
|
// add metrics
|
|
|
metrics.clear();
|
|
|
TimelineMetric m2 = new TimelineMetric();
|
|
@@ -149,6 +157,11 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
m2.setValues(metricValues);
|
|
|
metrics.add(m2);
|
|
|
entity1.addMetrics(metrics);
|
|
|
+ TimelineEvent event1 = new TimelineEvent();
|
|
|
+ event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
|
|
+ event1.setTimestamp(expTs);
|
|
|
+ event1.addInfo(expKey, expVal);
|
|
|
+ entity1.addEvent(event1);
|
|
|
te1.addEntity(entity1);
|
|
|
|
|
|
String flow2 = "flow_name2";
|
|
@@ -156,7 +169,7 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
Long runid2 = 2102356789046L;
|
|
|
TimelineEntities te3 = new TimelineEntities();
|
|
|
TimelineEntity entity3 = new TimelineEntity();
|
|
|
- id = "flowRunMetrics_test1";
|
|
|
+ id = "application_11111111111111_2223";
|
|
|
entity3.setId(id);
|
|
|
entity3.setType(type);
|
|
|
cTime = 1425016501030L;
|
|
@@ -168,18 +181,30 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
entity3.addEvent(event2);
|
|
|
te3.addEntity(entity3);
|
|
|
|
|
|
+ TimelineEntities te4 = new TimelineEntities();
|
|
|
+ TimelineEntity entity4 = new TimelineEntity();
|
|
|
+ id = "application_1111111111_2224";
|
|
|
+ entity4.setId(id);
|
|
|
+ entity4.setType(type);
|
|
|
+ cTime = 1425016501034L;
|
|
|
+ entity4.setCreatedTime(cTime);
|
|
|
+ TimelineEvent event4 = new TimelineEvent();
|
|
|
+ event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
|
|
|
+ event4.setTimestamp(1436512802037L);
|
|
|
+ event4.addInfo("foo_event", "test");
|
|
|
+ entity4.addEvent(event4);
|
|
|
+ te4.addEntity(entity4);
|
|
|
+
|
|
|
HBaseTimelineWriterImpl hbi = null;
|
|
|
Configuration c1 = util.getConfiguration();
|
|
|
try {
|
|
|
hbi = new HBaseTimelineWriterImpl(c1);
|
|
|
hbi.init(c1);
|
|
|
- String appName = "application_11111111111111_1111";
|
|
|
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
|
|
|
- appName = "application_11111111111111_2222";
|
|
|
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
|
|
|
- hbi.write(cluster, user, flow, flowVersion, runid1, appName, te);
|
|
|
- appName = "application_11111111111111_2223";
|
|
|
- hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3);
|
|
|
+ hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te);
|
|
|
+ hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1);
|
|
|
+ hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
|
|
|
+ hbi.write(cluster, user, flow2,
|
|
|
+ flowVersion2, runid2, entity3.getId(), te3);
|
|
|
hbi.flush();
|
|
|
} finally {
|
|
|
hbi.close();
|
|
@@ -248,8 +273,15 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static TimelineMetric newMetric(String id, long ts, Number value) {
|
|
|
- TimelineMetric metric = new TimelineMetric();
|
|
|
+ private static TimelineEntity newEntity(String type, String id) {
|
|
|
+ TimelineEntity entity = new TimelineEntity();
|
|
|
+ entity.setIdentifier(new TimelineEntity.Identifier(type, id));
|
|
|
+ return entity;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static TimelineMetric newMetric(TimelineMetric.Type type,
|
|
|
+ String id, long ts, Number value) {
|
|
|
+ TimelineMetric metric = new TimelineMetric(type);
|
|
|
metric.setId(id);
|
|
|
metric.addValue(ts, value);
|
|
|
return metric;
|
|
@@ -304,8 +336,10 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
assertNotNull(entity);
|
|
|
assertEquals("user1@flow_name/1002345678919", entity.getId());
|
|
|
assertEquals(2, entity.getMetrics().size());
|
|
|
- TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
|
|
|
- TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
|
|
|
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
|
|
+ "HDFS_BYTES_READ", ts - 80000, 57L);
|
|
|
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
|
|
+ "MAP_SLOT_MILLIS", ts - 80000, 141L);
|
|
|
for (TimelineMetric metric : entity.getMetrics()) {
|
|
|
assertTrue(verifyMetrics(metric, m1, m2));
|
|
|
}
|
|
@@ -318,8 +352,10 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
assertNotNull(entity);
|
|
|
assertEquals("user1@flow_name/1002345678919", entity.getId());
|
|
|
assertEquals(2, entity.getMetrics().size());
|
|
|
- m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
|
|
|
- m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
|
|
|
+ m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
|
|
+ "HDFS_BYTES_READ", ts - 80000, 57L);
|
|
|
+ m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
|
|
|
+ "MAP_SLOT_MILLIS", ts - 80000, 141L);
|
|
|
for (TimelineMetric metric : entity.getMetrics()) {
|
|
|
assertTrue(verifyMetrics(metric, m1, m2));
|
|
|
}
|
|
@@ -365,6 +401,192 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testGetApp() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/app/cluster1/application_1111111111_1111?" +
|
|
|
+ "userid=user1&fields=ALL&flowid=flow_name&flowrunid=1002345678919");
|
|
|
+ ClientResponse resp = getResponse(client, uri);
|
|
|
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
|
|
+ assertNotNull(entity);
|
|
|
+ assertEquals("application_1111111111_1111", entity.getId());
|
|
|
+ assertEquals(2, entity.getMetrics().size());
|
|
|
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
|
|
+ "HDFS_BYTES_READ", ts - 100000, 31L);
|
|
|
+ m1.addValue(ts - 80000, 57L);
|
|
|
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
|
|
+ "MAP_SLOT_MILLIS", ts - 100000, 2L);
|
|
|
+ m2.addValue(ts - 80000, 40L);
|
|
|
+ for (TimelineMetric metric : entity.getMetrics()) {
|
|
|
+ assertTrue(verifyMetrics(metric, m1, m2));
|
|
|
+ }
|
|
|
+
|
|
|
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/app/application_1111111111_2222?userid=user1" +
|
|
|
+ "&fields=metrics&flowid=flow_name&flowrunid=1002345678919");
|
|
|
+ resp = getResponse(client, uri);
|
|
|
+ entity = resp.getEntity(TimelineEntity.class);
|
|
|
+ assertNotNull(entity);
|
|
|
+ assertEquals("application_1111111111_2222", entity.getId());
|
|
|
+ assertEquals(1, entity.getMetrics().size());
|
|
|
+ TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
|
|
+ "MAP_SLOT_MILLIS", ts - 100000, 5L);
|
|
|
+ m2.addValue(ts - 80000, 101L);
|
|
|
+ for (TimelineMetric metric : entity.getMetrics()) {
|
|
|
+ assertTrue(verifyMetrics(metric, m3));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetAppWithoutFlowInfo() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/app/cluster1/application_1111111111_1111?" +
|
|
|
+ "userid=user1&fields=ALL");
|
|
|
+ ClientResponse resp = getResponse(client, uri);
|
|
|
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
|
|
|
+ assertNotNull(entity);
|
|
|
+ assertEquals("application_1111111111_1111", entity.getId());
|
|
|
+ assertEquals(2, entity.getMetrics().size());
|
|
|
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
|
|
+ "HDFS_BYTES_READ", ts - 100000, 31L);
|
|
|
+ m1.addValue(ts - 80000, 57L);
|
|
|
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
|
|
|
+ "MAP_SLOT_MILLIS", ts - 100000, 2L);
|
|
|
+ m2.addValue(ts - 80000, 40L);
|
|
|
+ for (TimelineMetric metric : entity.getMetrics()) {
|
|
|
+ assertTrue(verifyMetrics(metric, m1, m2));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetFlowRunApps() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowrunapps/cluster1/flow_name/1002345678919?" +
|
|
|
+ "userid=user1&fields=ALL");
|
|
|
+ ClientResponse resp = getResponse(client, uri);
|
|
|
+ Set<TimelineEntity> entities =
|
|
|
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(2, entities.size());
|
|
|
+ for (TimelineEntity entity : entities) {
|
|
|
+ assertTrue("Unexpected app in result",
|
|
|
+ (entity.getId().equals("application_1111111111_1111") &&
|
|
|
+ entity.getMetrics().size() == 2) ||
|
|
|
+ (entity.getId().equals("application_1111111111_2222") &&
|
|
|
+ entity.getMetrics().size() == 1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Query without specifying cluster ID.
|
|
|
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowrunapps/flow_name/1002345678919?userid=user1");
|
|
|
+ resp = getResponse(client, uri);
|
|
|
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(2, entities.size());
|
|
|
+
|
|
|
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1");
|
|
|
+ resp = getResponse(client, uri);
|
|
|
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(1, entities.size());
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetFlowApps() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL");
|
|
|
+ ClientResponse resp = getResponse(client, uri);
|
|
|
+ Set<TimelineEntity> entities =
|
|
|
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(3, entities.size());
|
|
|
+ for (TimelineEntity entity : entities) {
|
|
|
+ assertTrue("Unexpected app in result",
|
|
|
+ (entity.getId().equals("application_1111111111_1111") &&
|
|
|
+ entity.getMetrics().size() == 2) ||
|
|
|
+ (entity.getId().equals("application_1111111111_2222") &&
|
|
|
+ entity.getMetrics().size() == 1) ||
|
|
|
+ (entity.getId().equals("application_1111111111_2224") &&
|
|
|
+ entity.getMetrics().size() == 0));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Query without specifying cluster ID.
|
|
|
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowapps/flow_name?userid=user1");
|
|
|
+ resp = getResponse(client, uri);
|
|
|
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(3, entities.size());
|
|
|
+
|
|
|
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowapps/flow_name?userid=user1&limit=1");
|
|
|
+ resp = getResponse(client, uri);
|
|
|
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(1, entities.size());
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetFlowAppsFilters() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ String entityType = TimelineEntityType.YARN_APPLICATION.toString();
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" +
|
|
|
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
|
|
|
+ ClientResponse resp = getResponse(client, uri);
|
|
|
+ Set<TimelineEntity> entities =
|
|
|
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(1, entities.size());
|
|
|
+ assertTrue("Unexpected app in result", entities.contains(
|
|
|
+ newEntity(entityType, "application_1111111111_1111")));
|
|
|
+
|
|
|
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" +
|
|
|
+ "HDFS_BYTES_READ");
|
|
|
+ resp = getResponse(client, uri);
|
|
|
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(1, entities.size());
|
|
|
+ assertTrue("Unexpected app in result", entities.contains(
|
|
|
+ newEntity(entityType, "application_1111111111_1111")));
|
|
|
+
|
|
|
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" +
|
|
|
+ "cfg1:value1");
|
|
|
+ resp = getResponse(client, uri);
|
|
|
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(1, entities.size());
|
|
|
+ assertTrue("Unexpected app in result", entities.contains(
|
|
|
+ newEntity(entityType, "application_1111111111_2222")));
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testGetFlowRunNotPresent() throws Exception {
|
|
|
Client client = createClient();
|
|
@@ -394,6 +616,53 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testGetAppNotPresent() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/app/cluster1/flow_name/1002345678919/" +
|
|
|
+ "application_1111111111_1378?userid=user1");
|
|
|
+ verifyHttpResponse(client, uri, Status.NOT_FOUND);
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetFlowRunAppsNotPresent() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowrunapps/cluster2/flow_name/1002345678919");
|
|
|
+ ClientResponse resp = getResponse(client, uri);
|
|
|
+ Set<TimelineEntity> entities =
|
|
|
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(0, entities.size());
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetFlowAppsNotPresent() throws Exception {
|
|
|
+ Client client = createClient();
|
|
|
+ try {
|
|
|
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
|
|
+ "timeline/flowapps/cluster2/flow_name55");
|
|
|
+ ClientResponse resp = getResponse(client, uri);
|
|
|
+ Set<TimelineEntity> entities =
|
|
|
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
|
|
|
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
|
|
+ assertNotNull(entities);
|
|
|
+ assertEquals(0, entities.size());
|
|
|
+ } finally {
|
|
|
+ client.destroy();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@After
|
|
|
public void stop() throws Exception {
|
|
|
if (server != null) {
|
|
@@ -401,5 +670,4 @@ public class TestTimelineReaderWebServicesFlowRun {
|
|
|
server = null;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|