Procházet zdrojové kódy

YARN-5015. entire time series is returned for YARN container system metrics (CPU and memory) (Varun Saxena via sjlee)

Sangjin Lee před 9 roky
rodič
revize
c81a2e1d19
12 změnil soubory, kde provedl 711 přidání a 205 odebrání
  1. 182 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
  2. 230 85
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
  3. 13 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
  4. 30 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineDataToRetrieve.java
  5. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java
  6. 214 60
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
  7. 14 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java
  8. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
  9. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
  10. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
  11. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
  12. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java

+ 182 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timelineservice.reader;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
@@ -112,13 +113,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     TimelineMetric m1 = new TimelineMetric();
     m1.setId("MAP_SLOT_MILLIS");
     Map<Long, Number> metricValues =
-        ImmutableMap.of(ts - 100000, (Number)2, ts - 80000, 40);
+        ImmutableMap.of(ts - 100000, (Number)2, ts - 90000, 7, ts - 80000, 40);
     m1.setType(Type.TIME_SERIES);
     m1.setValues(metricValues);
     metrics.add(m1);
     m1 = new TimelineMetric();
     m1.setId("MAP1_SLOT_MILLIS");
-    metricValues = ImmutableMap.of(ts - 100000, (Number)2, ts - 80000, 40);
+    metricValues =
+        ImmutableMap.of(ts - 100000, (Number)2, ts - 90000, 9, ts - 80000, 40);
     m1.setType(Type.TIME_SERIES);
     m1.setValues(metricValues);
     metrics.add(m1);
@@ -460,6 +462,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     assertNotNull(resp);
     assertTrue("Response from server should have been " + status,
         resp.getClientResponseStatus().equals(status));
+    System.out.println("Response is: " + resp.getEntity(String.class));
   }
 
   @Test
@@ -615,12 +618,18 @@ public class TestTimelineReaderWebServicesHBaseStorage {
             (entity.getStartTime() == 1425016501034L) &&
             (entity.getMetrics().size() == 1)));
       }
+
+      // fields as CONFIGS will lead to a HTTP 400 as it makes no sense for
+      // flow runs.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
+          "fields=CONFIGS");
+      verifyHttpResponse(client, uri, Status.BAD_REQUEST);
     } finally {
       client.destroy();
     }
   }
 
-
   @Test
   public void testGetFlowRunsMetricsToRetrieve() throws Exception {
     Client client = createClient();
@@ -1024,15 +1033,12 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       assertNotNull(entity);
       assertEquals("application_1111111111_1111", entity.getId());
       assertEquals(3, entity.getMetrics().size());
-      TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
-          "HDFS_BYTES_READ", ts - 100000, 31L);
-      m1.addValue(ts - 80000, 57L);
-      TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
-          "MAP_SLOT_MILLIS", ts - 100000, 2L);
-      m2.addValue(ts - 80000, 40L);
-      TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
-          "MAP1_SLOT_MILLIS", ts - 100000, 2L);
-      m3.addValue(ts - 80000, 40L);
+      TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "HDFS_BYTES_READ", ts - 80000, 57L);
+      TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "MAP_SLOT_MILLIS", ts - 80000, 40L);
+      TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "MAP1_SLOT_MILLIS", ts - 80000, 40L);
       for (TimelineMetric metric : entity.getMetrics()) {
         assertTrue(verifyMetrics(metric, m1, m2, m3));
       }
@@ -1045,9 +1051,8 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       assertNotNull(entity);
       assertEquals("application_1111111111_2222", entity.getId());
       assertEquals(1, entity.getMetrics().size());
-      TimelineMetric m4 = newMetric(TimelineMetric.Type.TIME_SERIES,
-         "MAP_SLOT_MILLIS", ts - 100000, 5L);
-      m4.addValue(ts - 80000, 101L);
+      TimelineMetric m4 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+         "MAP_SLOT_MILLIS", ts - 80000, 101L);
       for (TimelineMetric metric : entity.getMetrics()) {
         assertTrue(verifyMetrics(metric, m4));
       }
@@ -1067,15 +1072,35 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       TimelineEntity entity = resp.getEntity(TimelineEntity.class);
       assertNotNull(entity);
       assertEquals("application_1111111111_1111", entity.getId());
+      assertEquals(1, entity.getConfigs().size());
+      assertEquals(3, entity.getMetrics().size());
+      TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "HDFS_BYTES_READ", ts - 80000, 57L);
+      TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "MAP_SLOT_MILLIS", ts - 80000, 40L);
+      TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+          "MAP1_SLOT_MILLIS", ts - 80000, 40L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2, m3));
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/clusters/cluster1/apps/application_1111111111_1111?" +
+          "fields=ALL&metricslimit=10");
+      resp = getResponse(client, uri);
+      entity = resp.getEntity(TimelineEntity.class);
+      assertNotNull(entity);
+      assertEquals("application_1111111111_1111", entity.getId());
+      assertEquals(1, entity.getConfigs().size());
       assertEquals(3, entity.getMetrics().size());
-      TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
-          "HDFS_BYTES_READ", ts - 100000, 31L);
+      m1 = newMetric(TimelineMetric.Type.TIME_SERIES, "HDFS_BYTES_READ",
+          ts - 100000, 31L);
       m1.addValue(ts - 80000, 57L);
-      TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
-          "MAP_SLOT_MILLIS", ts - 100000, 2L);
+      m2 = newMetric(TimelineMetric.Type.TIME_SERIES, "MAP_SLOT_MILLIS",
+          ts - 100000, 2L);
       m2.addValue(ts - 80000, 40L);
-      TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
-          "MAP1_SLOT_MILLIS", ts - 100000, 2L);
+      m3 = newMetric(TimelineMetric.Type.TIME_SERIES, "MAP1_SLOT_MILLIS",
+          ts - 100000, 2L);
       m3.addValue(ts - 80000, 40L);
       for (TimelineMetric metric : entity.getMetrics()) {
         assertTrue(verifyMetrics(metric, m1, m2, m3));
@@ -1229,11 +1254,6 @@ public class TestTimelineReaderWebServicesHBaseStorage {
         }
       }
       assertEquals(2, metricCnt);
-
-      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
-          "entities/type1?metricstoretrieve=!(MAP1_,HDFS_");
-      verifyHttpResponse(client, uri, Status.BAD_REQUEST);
     } finally {
       client.destroy();
     }
@@ -1550,6 +1570,35 @@ public class TestTimelineReaderWebServicesHBaseStorage {
         assertTrue(entity.getId().equals("entity2"));
         for (TimelineMetric metric : entity.getMetrics()) {
           assertTrue(metric.getId().startsWith("MAP1"));
+          assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
+        }
+      }
+      assertEquals(2, metricCnt);
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+          "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" +
+          "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" +
+          "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" +
+          "!(HDFS)&metricslimit=10");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      metricCnt = 0;
+      for (TimelineEntity entity : entities) {
+        metricCnt += entity.getMetrics().size();
+        assertTrue(entity.getId().equals("entity2"));
+        for (TimelineMetric metric : entity.getMetrics()) {
+          assertTrue(metric.getId().startsWith("MAP1"));
+          if (metric.getId().equals("MAP1_SLOT_MILLIS")) {
+            assertEquals(2, metric.getValues().size());
+            assertEquals(TimelineMetric.Type.TIME_SERIES, metric.getType());
+          } else if (metric.getId().equals("MAP11_SLOT_MILLIS")) {
+            assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
+          } else {
+            fail("Unexpected metric id");
+          }
         }
       }
       assertEquals(2, metricCnt);
@@ -1794,6 +1843,23 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       assertEquals(1, entity.getMetrics().size());
       for (TimelineMetric  metric : entity.getMetrics()) {
         assertTrue(metric.getId().startsWith("MAP11_"));
+        assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
+        assertEquals(1, metric.getValues().size());
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+          "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)&" +
+          "metricslimit=5");
+      resp = getResponse(client, uri);
+      entity = resp.getEntity(TimelineEntity.class);
+      assertNotNull(entity);
+      assertEquals("entity2", entity.getId());
+      assertEquals("type1", entity.getType());
+      assertEquals(1, entity.getMetrics().size());
+      for (TimelineMetric  metric : entity.getMetrics()) {
+        assertTrue(metric.getId().startsWith("MAP11_"));
+        assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
       }
     } finally {
       client.destroy();
@@ -1818,6 +1884,29 @@ public class TestTimelineReaderWebServicesHBaseStorage {
             entity.getMetrics().size() == 3) ||
             (entity.getId().equals("application_1111111111_2222") &&
             entity.getMetrics().size() == 1));
+        for (TimelineMetric metric : entity.getMetrics()) {
+          assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
+          assertEquals(1, metric.getValues().size());
+        }
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+              "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
+              "1002345678919/apps?fields=ALL&metricslimit=2");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (TimelineEntity entity : entities) {
+        assertTrue("Unexpected app in result",
+            (entity.getId().equals("application_1111111111_1111") &&
+            entity.getMetrics().size() == 3) ||
+            (entity.getId().equals("application_1111111111_2222") &&
+            entity.getMetrics().size() == 1));
+        for (TimelineMetric metric : entity.getMetrics()) {
+          assertTrue(metric.getValues().size() <= 2);
+          assertEquals(TimelineMetric.Type.TIME_SERIES, metric.getType());
+        }
       }
 
       // Query without specifying cluster ID.
@@ -1855,11 +1944,75 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       for (TimelineEntity entity : entities) {
         assertTrue("Unexpected app in result",
             (entity.getId().equals("application_1111111111_1111") &&
-            entity.getMetrics().size() == 3) ||
+            entity.getConfigs().size() == 1 &&
+            entity.getConfigs().equals(ImmutableMap.of("cfg2", "value1"))) ||
             (entity.getId().equals("application_1111111111_2222") &&
-            entity.getMetrics().size() == 1) ||
+            entity.getConfigs().size() == 1 &&
+            entity.getConfigs().equals(ImmutableMap.of("cfg1", "value1"))) ||
             (entity.getId().equals("application_1111111111_2224") &&
-            entity.getMetrics().size() == 1));
+            entity.getConfigs().size() == 0));
+        for (TimelineMetric metric : entity.getMetrics()) {
+          if (entity.getId().equals("application_1111111111_1111")) {
+            TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+                "HDFS_BYTES_READ", ts - 80000, 57L);
+            TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+                "MAP_SLOT_MILLIS", ts - 80000, 40L);
+            TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+                "MAP1_SLOT_MILLIS", ts - 80000, 40L);
+            assertTrue(verifyMetrics(metric, m1, m2, m3));
+          } else if (entity.getId().equals("application_1111111111_2222")) {
+            TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+                "MAP_SLOT_MILLIS", ts - 80000, 101L);
+            assertTrue(verifyMetrics(metric, m1));
+          } else if (entity.getId().equals("application_1111111111_2224")) {
+            TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+                "MAP_SLOT_MILLIS", ts - 80000, 101L);
+            assertTrue(verifyMetrics(metric, m1));
+          }
+        }
+      }
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
+          "fields=ALL&metricslimit=6");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(3, entities.size());
+      for (TimelineEntity entity : entities) {
+        assertTrue("Unexpected app in result",
+            (entity.getId().equals("application_1111111111_1111") &&
+            entity.getConfigs().size() == 1 &&
+            entity.getConfigs().equals(ImmutableMap.of("cfg2", "value1"))) ||
+            (entity.getId().equals("application_1111111111_2222") &&
+            entity.getConfigs().size() == 1 &&
+            entity.getConfigs().equals(ImmutableMap.of("cfg1", "value1"))) ||
+            (entity.getId().equals("application_1111111111_2224") &&
+            entity.getConfigs().size() == 0));
+        for (TimelineMetric metric : entity.getMetrics()) {
+          if (entity.getId().equals("application_1111111111_1111")) {
+            TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+                "HDFS_BYTES_READ", ts - 80000, 57L);
+            m1.addValue(ts - 100000, 31L);
+            TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
+                "MAP_SLOT_MILLIS", ts - 80000, 40L);
+            m2.addValue(ts - 100000, 2L);
+            TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
+                "MAP1_SLOT_MILLIS", ts - 80000, 40L);
+            m3.addValue(ts - 100000, 2L);
+            assertTrue(verifyMetrics(metric, m1, m2, m3));
+          } else if (entity.getId().equals("application_1111111111_2222")) {
+            TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+                "MAP_SLOT_MILLIS", ts - 80000, 101L);
+            m1.addValue(ts - 100000, 5L);
+            assertTrue(verifyMetrics(metric, m1));
+          } else if (entity.getId().equals("application_1111111111_2224")) {
+            TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+                "MAP_SLOT_MILLIS", ts - 80000, 101L);
+            m1.addValue(ts - 100000, 5L);
+            assertTrue(verifyMetrics(metric, m1));
+          }
+        }
       }
 
       // Query without specifying cluster ID.

+ 230 - 85
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java

@@ -612,7 +612,7 @@ public class TestHBaseTimelineStorage {
     aggMetric.setId("MEM_USAGE");
     Map<Long, Number> aggMetricValues = new HashMap<Long, Number>();
     ts = System.currentTimeMillis();
-    aggMetricValues.put(ts - 120000, 102400000);
+    aggMetricValues.put(ts - 120000, 102400000L);
     aggMetric.setType(Type.SINGLE_VALUE);
     aggMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
     aggMetric.setValues(aggMetricValues);
@@ -721,12 +721,14 @@ public class TestHBaseTimelineStorage {
       NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
       matchMetrics(metricValues, metricMap);
 
-      // read the timeline entity using the reader this time
+      // read the timeline entity using the reader this time. In metrics limit
+      // specify Integer MAX_VALUE. A TIME_SERIES will be returned(if more than
+      // one value exists for a metric).
       TimelineEntity e1 = reader.getEntity(
           new TimelineReaderContext(cluster, user, flow, runid, appId,
           entity.getType(), entity.getId()),
-          new TimelineDataToRetrieve(
-          null, null, EnumSet.of(TimelineReader.Field.ALL)));
+          new TimelineDataToRetrieve(null, null,
+          EnumSet.of(TimelineReader.Field.ALL), Integer.MAX_VALUE));
       assertNotNull(e1);
 
       // verify attributes
@@ -753,12 +755,69 @@ public class TestHBaseTimelineStorage {
         assertTrue(metric2.getId().equals("MAP_SLOT_MILLIS") ||
             metric2.getId().equals("MEM_USAGE"));
         if (metric2.getId().equals("MAP_SLOT_MILLIS")) {
+          assertEquals(6, metricValues2.size());
           matchMetrics(metricValues, metricValues2);
         }
         if (metric2.getId().equals("MEM_USAGE")) {
+          assertEquals(1, metricValues2.size());
           matchMetrics(aggMetricValues, metricValues2);
         }
       }
+
+      // In metrics limit specify a value of 3. No more than 3 values for a
+      // metric will be returned.
+      e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow,
+          runid, appId, entity.getType(), entity.getId()),
+          new TimelineDataToRetrieve(null, null,
+          EnumSet.of(TimelineReader.Field.ALL), 3));
+      assertNotNull(e1);
+      assertEquals(appId, e1.getId());
+      assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
+          e1.getType());
+      assertEquals(conf, e1.getConfigs());
+      metrics2 = e1.getMetrics();
+      assertEquals(2, metrics2.size());
+      for (TimelineMetric metric2 : metrics2) {
+        Map<Long, Number> metricValues2 = metric2.getValues();
+        assertTrue(metricValues2.size() <= 3);
+        assertTrue(metric2.getId().equals("MAP_SLOT_MILLIS") ||
+            metric2.getId().equals("MEM_USAGE"));
+      }
+
+      // Check if single value(latest value) instead of time series is returned
+      // if metricslimit is not set(null), irrespective of number of metric
+      // values.
+      e1 = reader.getEntity(
+          new TimelineReaderContext(cluster, user, flow, runid, appId,
+         entity.getType(), entity.getId()), new TimelineDataToRetrieve(
+         null, null, EnumSet.of(TimelineReader.Field.ALL), null));
+      assertNotNull(e1);
+      assertEquals(appId, e1.getId());
+      assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
+          e1.getType());
+      assertEquals(cTime, e1.getCreatedTime());
+      assertEquals(infoMap, e1.getInfo());
+      assertEquals(isRelatedTo, e1.getIsRelatedToEntities());
+      assertEquals(relatesTo, e1.getRelatesToEntities());
+      assertEquals(conf, e1.getConfigs());
+      assertEquals(2, e1.getMetrics().size());
+      for (TimelineMetric metric : e1.getMetrics()) {
+        assertEquals(1, metric.getValues().size());
+        assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
+        assertTrue(metric.getId().equals("MAP_SLOT_MILLIS") ||
+            metric.getId().equals("MEM_USAGE"));
+        assertEquals(1, metric.getValues().size());
+        if (metric.getId().equals("MAP_SLOT_MILLIS")) {
+          assertTrue(metric.getValues().containsKey(ts - 20000));
+          assertEquals(metricValues.get(ts - 20000),
+              metric.getValues().get(ts - 20000));
+        }
+        if (metric.getId().equals("MEM_USAGE")) {
+          assertTrue(metric.getValues().containsKey(ts - 120000));
+          assertEquals(aggMetricValues.get(ts - 120000),
+              metric.getValues().get(ts - 120000));
+        }
+      }
     } finally {
       if (hbi != null) {
         hbi.stop();
@@ -839,8 +898,8 @@ public class TestHBaseTimelineStorage {
       String flow = "some_flow_name";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
-      String appName =
-          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
+      String appName = ApplicationId.newInstance(System.currentTimeMillis() +
+          9000000L, 1).toString();
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       hbi.stop();
 
@@ -931,12 +990,14 @@ public class TestHBaseTimelineStorage {
       TimelineEntity e1 = reader.getEntity(
           new TimelineReaderContext(cluster, user, flow, runid, appName,
           entity.getType(), entity.getId()),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL),
+          Integer.MAX_VALUE));
       Set<TimelineEntity> es1 = reader.getEntities(
           new TimelineReaderContext(cluster, user, flow, runid, appName,
           entity.getType(), null),
           new TimelineEntityFilters(),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL),
+          Integer.MAX_VALUE));
       assertNotNull(e1);
       assertEquals(1, es1.size());
 
@@ -962,6 +1023,25 @@ public class TestHBaseTimelineStorage {
         Map<Long, Number> metricValues2 = metric2.getValues();
         matchMetrics(metricValues, metricValues2);
       }
+
+      e1 = reader.getEntity(new TimelineReaderContext(cluster, user, flow,
+          runid, appName, entity.getType(), entity.getId()),
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
+      assertNotNull(e1);
+      assertEquals(id, e1.getId());
+      assertEquals(type, e1.getType());
+      assertEquals(cTime, e1.getCreatedTime());
+      assertEquals(infoMap, e1.getInfo());
+      assertEquals(isRelatedTo, e1.getIsRelatedToEntities());
+      assertEquals(relatesTo, e1.getRelatesToEntities());
+      assertEquals(conf, e1.getConfigs());
+      for (TimelineMetric metric : e1.getMetrics()) {
+        assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
+        assertEquals(1, metric.getValues().size());
+        assertTrue(metric.getValues().containsKey(ts - 20000));
+        assertEquals(metricValues.get(ts - 20000),
+            metric.getValues().get(ts - 20000));
+      }
     } finally {
       if (hbi != null) {
         hbi.stop();
@@ -1067,11 +1147,11 @@ public class TestHBaseTimelineStorage {
       TimelineEntity e1 = reader.getEntity(
           new TimelineReaderContext(cluster, user, flow, runid, appName,
           entity.getType(), entity.getId()),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
       TimelineEntity e2 = reader.getEntity(
           new TimelineReaderContext(cluster, user, null, null, appName,
           entity.getType(), entity.getId()),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
       assertNotNull(e1);
       assertNotNull(e2);
       assertEquals(e1, e2);
@@ -1125,8 +1205,8 @@ public class TestHBaseTimelineStorage {
       String flow = "other_flow_name";
       String flowVersion = "1111F01C2287BA";
       long runid = 1009876543218L;
-      String appName =
-          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
+      String appName = ApplicationId.newInstance(System.currentTimeMillis() +
+          9000000L, 1).toString();
       byte[] startRow =
           EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
@@ -1173,12 +1253,12 @@ public class TestHBaseTimelineStorage {
       TimelineEntity e1 = reader.getEntity(
           new TimelineReaderContext(cluster, user, flow, runid, appName,
           entity.getType(), entity.getId()),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
       Set<TimelineEntity> es1 = reader.getEntities(
           new TimelineReaderContext(cluster, user, flow, runid, appName,
           entity.getType(), null),
           new TimelineEntityFilters(),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
       assertNotNull(e1);
       assertEquals(1, es1.size());
 
@@ -1235,7 +1315,7 @@ public class TestHBaseTimelineStorage {
       TimelineEntity e1 = reader.getEntity(
           new TimelineReaderContext(cluster, user, flow, runid, appName,
           entity.getType(), entity.getId()),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
       assertNotNull(e1);
       // check the events
       NavigableSet<TimelineEvent> events = e1.getEvents();
@@ -1325,7 +1405,7 @@ public class TestHBaseTimelineStorage {
     TimelineEntity entity = reader.getEntity(
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1231111111_1111","world", "hello"),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertNotNull(entity);
     assertEquals(3, entity.getConfigs().size());
     assertEquals(1, entity.getIsRelatedToEntities().size());
@@ -1333,7 +1413,7 @@ public class TestHBaseTimelineStorage {
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1231111111_1111","world",
         null), new TimelineEntityFilters(),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(3, entities.size());
     int cfgCnt = 0;
     int metricCnt = 0;
@@ -1457,7 +1537,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         null, ef),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(1, entities.size());
     int eventCnt = 0;
     for (TimelineEntity timelineEntity : entities) {
@@ -1583,7 +1663,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, irt, null, null, null,
         null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     int isRelatedToCnt = 0;
     for (TimelineEntity timelineEntity : entities) {
@@ -1732,7 +1812,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, rt, null, null, null, null,
         null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     int relatesToCnt = 0;
     for (TimelineEntity timelineEntity : entities) {
@@ -1932,7 +2012,7 @@ public class TestHBaseTimelineStorage {
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1231111111_1111","world", "hello"),
         new TimelineDataToRetrieve(
-        null, null, EnumSet.of(Field.INFO, Field.CONFIGS)));
+        null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null));
     assertNotNull(e1);
     assertEquals(3, e1.getConfigs().size());
     assertEquals(0, e1.getIsRelatedToEntities().size());
@@ -1941,7 +2021,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(),
         new TimelineDataToRetrieve(
-        null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)));
+        null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null));
     assertEquals(3, es1.size());
     int metricsCnt = 0;
     int isRelatedToCnt = 0;
@@ -1964,14 +2044,14 @@ public class TestHBaseTimelineStorage {
     TimelineEntity e1 = reader.getEntity(
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1231111111_1111","world", "hello"),
-        new TimelineDataToRetrieve(list, null, null));
+        new TimelineDataToRetrieve(list, null, null, null));
     assertNotNull(e1);
     assertEquals(1, e1.getConfigs().size());
     Set<TimelineEntity> es1 = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(),
-        new TimelineDataToRetrieve(list, null, null));
+        new TimelineDataToRetrieve(list, null, null, null));
     int cfgCnt = 0;
     for (TimelineEntity entity : es1) {
       cfgCnt += entity.getConfigs().size();
@@ -2002,7 +2082,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(2, entities.size());
     int cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2015,7 +2096,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2031,7 +2112,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList1, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(1, entities.size());
     cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2049,7 +2131,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList2, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList confFilterList3 = new TimelineFilterList(
@@ -2060,7 +2143,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList3, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList confFilterList4 = new TimelineFilterList(
@@ -2071,7 +2155,8 @@ public class TestHBaseTimelineStorage {
             1002345678919L, "application_1231111111_1111","world", null),
             new TimelineEntityFilters(null, null, null, null, null, null,
             confFilterList4, null, null),
-            new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+            new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+            null));
     assertEquals(0, entities.size());
 
     TimelineFilterList confFilterList5 = new TimelineFilterList(
@@ -2082,7 +2167,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList5, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(3, entities.size());
   }
 
@@ -2099,7 +2185,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList, null, null),
-        new TimelineDataToRetrieve(list, null, null));
+        new TimelineDataToRetrieve(list, null, null, null));
     assertEquals(1, entities.size());
     int cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2130,7 +2216,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList1, null, null),
-        new TimelineDataToRetrieve(confsToRetrieve, null, null));
+        new TimelineDataToRetrieve(confsToRetrieve, null, null, null));
     assertEquals(2, entities.size());
     cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2151,14 +2237,14 @@ public class TestHBaseTimelineStorage {
     TimelineEntity e1 = reader.getEntity(
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1231111111_1111","world", "hello"),
-        new TimelineDataToRetrieve(null, list, null));
+        new TimelineDataToRetrieve(null, list, null, null));
     assertNotNull(e1);
     assertEquals(1, e1.getMetrics().size());
     Set<TimelineEntity> es1 = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(),
-        new TimelineDataToRetrieve(null, list, null));
+        new TimelineDataToRetrieve(null, list, null, null));
     int metricCnt = 0;
     for (TimelineEntity entity : es1) {
       metricCnt += entity.getMetrics().size();
@@ -2187,7 +2273,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(2, entities.size());
     int metricCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2200,7 +2287,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     metricCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2218,7 +2305,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList1, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(1, entities.size());
     metricCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2236,7 +2324,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList2, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList metricFilterList3 = new TimelineFilterList(
@@ -2247,7 +2336,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList3, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList metricFilterList4 = new TimelineFilterList(
@@ -2258,7 +2348,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList4, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList metricFilterList5 = new TimelineFilterList(
@@ -2269,7 +2360,8 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList5, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(3, entities.size());
   }
 
@@ -2286,7 +2378,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList, null),
-        new TimelineDataToRetrieve(null, list, null));
+        new TimelineDataToRetrieve(null, list, null, null));
     assertEquals(1, entities.size());
     int metricCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2316,17 +2408,38 @@ public class TestHBaseTimelineStorage {
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList1, null),
         new TimelineDataToRetrieve(
-        null, metricsToRetrieve, EnumSet.of(Field.METRICS)));
+        null, metricsToRetrieve, EnumSet.of(Field.METRICS), null));
+    assertEquals(2, entities.size());
+    metricCnt = 0;
+    for (TimelineEntity entity : entities) {
+      metricCnt += entity.getMetrics().size();
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric.getType());
+        assertEquals(1, metric.getValues().size());
+        assertTrue("Metric Id returned should start with MAP1_",
+            metric.getId().startsWith("MAP1_"));
+      }
+    }
+    assertEquals(2, metricCnt);
+
+    entities = reader.getEntities(new TimelineReaderContext("cluster1", "user1",
+        "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+        null), new TimelineEntityFilters(null, null, null, null, null, null,
+        null, metricFilterList1, null), new TimelineDataToRetrieve(null,
+        metricsToRetrieve, EnumSet.of(Field.METRICS), Integer.MAX_VALUE));
     assertEquals(2, entities.size());
     metricCnt = 0;
+    int metricValCnt = 0;
     for (TimelineEntity entity : entities) {
       metricCnt += entity.getMetrics().size();
       for (TimelineMetric metric : entity.getMetrics()) {
+        metricValCnt += metric.getValues().size();
         assertTrue("Metric Id returned should start with MAP1_",
             metric.getId().startsWith("MAP1_"));
       }
     }
     assertEquals(2, metricCnt);
+    assertEquals(7, metricValCnt);
   }
 
   @Test
@@ -2348,7 +2461,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(2, entities.size());
     int infoCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2364,7 +2477,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList1,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(1, entities.size());
     infoCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -2382,7 +2495,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList2,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(0, entities.size());
 
     TimelineFilterList infoFilterList3 = new TimelineFilterList(
@@ -2393,7 +2506,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList3,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(0, entities.size());
 
     TimelineFilterList infoFilterList4 = new TimelineFilterList(
@@ -2404,7 +2517,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList4,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(0, entities.size());
 
     TimelineFilterList infoFilterList5 = new TimelineFilterList(
@@ -2415,7 +2528,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1231111111_1111","world", null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList5,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(3, entities.size());
   }
 
@@ -2425,7 +2538,7 @@ public class TestHBaseTimelineStorage {
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1111111111_2222",
         TimelineEntityType.YARN_APPLICATION.toString(), null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertNotNull(entity);
     assertEquals(3, entity.getConfigs().size());
     assertEquals(1, entity.getIsRelatedToEntities().size());
@@ -2434,7 +2547,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
         null),
         new TimelineEntityFilters(),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(3, entities.size());
     int cfgCnt = 0;
     int metricCnt = 0;
@@ -2546,7 +2659,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, "application_1111111111_2222",
         TimelineEntityType.YARN_APPLICATION.toString(), null),
         new TimelineDataToRetrieve(
-        null, null, EnumSet.of(Field.INFO, Field.CONFIGS)));
+        null, null, EnumSet.of(Field.INFO, Field.CONFIGS), null));
     assertNotNull(e1);
     assertEquals(3, e1.getConfigs().size());
     assertEquals(0, e1.getIsRelatedToEntities().size());
@@ -2556,7 +2669,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(),
         new TimelineDataToRetrieve(
-        null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS)));
+        null, null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS), null));
     assertEquals(3, es1.size());
     int metricsCnt = 0;
     int isRelatedToCnt = 0;
@@ -2586,7 +2699,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, irt, null, null, null,
         null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     int isRelatedToCnt = 0;
     for (TimelineEntity timelineEntity : entities) {
@@ -2745,7 +2858,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, rt, null, null, null, null,
         null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     int relatesToCnt = 0;
     for (TimelineEntity timelineEntity : entities) {
@@ -2989,7 +3102,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(2, entities.size());
     int cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3003,7 +3117,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3020,7 +3134,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList1, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(1, entities.size());
     cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3039,7 +3154,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList2, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList confFilterList3 = new TimelineFilterList(
@@ -3051,7 +3167,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList3, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList confFilterList4 = new TimelineFilterList(
@@ -3063,7 +3180,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList4, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList confFilterList5 = new TimelineFilterList(
@@ -3075,7 +3193,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList5, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.CONFIGS),
+        null));
     assertEquals(3, entities.size());
   }
 
@@ -3092,7 +3211,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         null, ef),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(1, entities.size());
     int eventCnt = 0;
     for (TimelineEntity timelineEntity : entities) {
@@ -3218,7 +3337,7 @@ public class TestHBaseTimelineStorage {
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1111111111_2222",
         TimelineEntityType.YARN_APPLICATION.toString(), null),
-        new TimelineDataToRetrieve(list, null, null));
+        new TimelineDataToRetrieve(list, null, null, null));
     assertNotNull(e1);
     assertEquals(1, e1.getConfigs().size());
     Set<TimelineEntity> es1 = reader.getEntities(
@@ -3226,7 +3345,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
         null) ,
         new TimelineEntityFilters(),
-        new TimelineDataToRetrieve(list, null, null));
+        new TimelineDataToRetrieve(list, null, null, null));
     int cfgCnt = 0;
     for (TimelineEntity entity : es1) {
       cfgCnt += entity.getConfigs().size();
@@ -3252,7 +3371,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList, null, null),
-        new TimelineDataToRetrieve(list, null, null));
+        new TimelineDataToRetrieve(list, null, null, null));
     assertEquals(1, entities.size());
     int cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3285,7 +3404,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null,
         confFilterList1, null, null),
-        new TimelineDataToRetrieve(confsToRetrieve, null, null));
+        new TimelineDataToRetrieve(confsToRetrieve, null, null, null));
     assertEquals(2, entities.size());
     cfgCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3316,7 +3435,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(2, entities.size());
     int metricCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3330,7 +3450,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     assertEquals(2, entities.size());
     metricCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3349,7 +3469,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList1, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(1, entities.size());
     metricCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3368,7 +3489,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList2, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList metricFilterList3 = new TimelineFilterList(
@@ -3380,7 +3502,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList3, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList metricFilterList4 = new TimelineFilterList(
@@ -3392,7 +3515,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList4, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(0, entities.size());
 
     TimelineFilterList metricFilterList5 = new TimelineFilterList(
@@ -3404,7 +3528,8 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList5, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS),
+        null));
     assertEquals(3, entities.size());
   }
 
@@ -3417,7 +3542,7 @@ public class TestHBaseTimelineStorage {
         new TimelineReaderContext("cluster1", "user1", "some_flow_name",
         1002345678919L, "application_1111111111_2222",
         TimelineEntityType.YARN_APPLICATION.toString(), null),
-        new TimelineDataToRetrieve(null, list, null));
+        new TimelineDataToRetrieve(null, list, null, null));
     assertNotNull(e1);
     assertEquals(1, e1.getMetrics().size());
     Set<TimelineEntity> es1 = reader.getEntities(
@@ -3425,7 +3550,7 @@ public class TestHBaseTimelineStorage {
         1002345678919L, null, TimelineEntityType.YARN_APPLICATION.toString(),
         null),
         new TimelineEntityFilters(),
-        new TimelineDataToRetrieve(null, list, null));
+        new TimelineDataToRetrieve(null, list, null, null));
     int metricCnt = 0;
     for (TimelineEntity entity : es1) {
       metricCnt += entity.getMetrics().size();
@@ -3451,7 +3576,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList, null),
-        new TimelineDataToRetrieve(null, list, null));
+        new TimelineDataToRetrieve(null, list, null, null));
     int metricCnt = 0;
     assertEquals(1, entities.size());
     for (TimelineEntity entity : entities) {
@@ -3477,17 +3602,37 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, null, null,
         metricFilterList1, null),
-        new TimelineDataToRetrieve(null, metricsToRetrieve, null));
+        new TimelineDataToRetrieve(null, metricsToRetrieve, null, null));
+    metricCnt = 0;
+    assertEquals(2, entities.size());
+    for (TimelineEntity entity : entities) {
+      metricCnt += entity.getMetrics().size();
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue("Metric Id returned should start with MAP1_",
+            metric.getId().startsWith("MAP1_"));
+      }
+    }
+    assertEquals(2, metricCnt);
+
+    entities = reader.getEntities(new TimelineReaderContext("cluster1", "user1",
+        "some_flow_name", 1002345678919L, null,
+        TimelineEntityType.YARN_APPLICATION.toString(), null),
+        new TimelineEntityFilters(null, null, null, null, null, null, null,
+        metricFilterList1, null), new TimelineDataToRetrieve(null,
+        metricsToRetrieve, EnumSet.of(Field.METRICS), Integer.MAX_VALUE));
     metricCnt = 0;
+    int metricValCnt = 0;
     assertEquals(2, entities.size());
     for (TimelineEntity entity : entities) {
       metricCnt += entity.getMetrics().size();
       for (TimelineMetric metric : entity.getMetrics()) {
+        metricValCnt += metric.getValues().size();
         assertTrue("Metric Id returned should start with MAP1_",
             metric.getId().startsWith("MAP1_"));
       }
     }
     assertEquals(2, metricCnt);
+    assertEquals(7, metricValCnt);
   }
 
   @Test
@@ -3510,7 +3655,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(2, entities.size());
     int infoCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3527,7 +3672,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList1,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(1, entities.size());
     infoCnt = 0;
     for (TimelineEntity entity : entities) {
@@ -3546,7 +3691,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList2,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(0, entities.size());
 
     TimelineFilterList infoFilterList3 = new TimelineFilterList(
@@ -3558,7 +3703,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList3,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(0, entities.size());
 
     TimelineFilterList infoFilterList4 = new TimelineFilterList(
@@ -3570,7 +3715,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList4,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(0, entities.size());
 
     TimelineFilterList infoFilterList5 = new TimelineFilterList(
@@ -3582,7 +3727,7 @@ public class TestHBaseTimelineStorage {
         null),
         new TimelineEntityFilters(null, null, null, null, null, infoFilterList5,
         null, null, null),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.INFO), null));
     assertEquals(3, entities.size());
   }
 

+ 13 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java

@@ -422,7 +422,7 @@ public class TestHBaseStorageFlowRun {
       TimelineEntity entity = hbr.getEntity(
           new TimelineReaderContext(cluster, user, flow, 1002345678919L, null,
           TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineDataToRetrieve(null, metricsToRetrieve, null));
+          new TimelineDataToRetrieve(null, metricsToRetrieve, null, null));
       assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
       Set<TimelineMetric> metrics = entity.getMetrics();
       assertEquals(1, metrics.size());
@@ -447,7 +447,7 @@ public class TestHBaseStorageFlowRun {
           new TimelineReaderContext(cluster, user, flow, null, null,
           TimelineEntityType.YARN_FLOW_RUN.toString(), null),
           new TimelineEntityFilters(),
-          new TimelineDataToRetrieve(null, metricsToRetrieve, null));
+          new TimelineDataToRetrieve(null, metricsToRetrieve, null, null));
       assertEquals(2, entities.size());
       int metricCnt = 0;
       for (TimelineEntity timelineEntity : entities) {
@@ -513,8 +513,8 @@ public class TestHBaseStorageFlowRun {
       entities = hbr.getEntities(
           new TimelineReaderContext(cluster, user, flow, runid, null,
           TimelineEntityType.YARN_FLOW_RUN.toString(), null),
-          new TimelineEntityFilters(),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+          new TimelineEntityFilters(), new TimelineDataToRetrieve(null, null,
+          EnumSet.of(Field.METRICS), null));
       assertEquals(1, entities.size());
       for (TimelineEntity timelineEntity : entities) {
         Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
@@ -766,8 +766,8 @@ public class TestHBaseStorageFlowRun {
           new TimelineReaderContext(cluster, user, flow, null,
           null, TimelineEntityType.YARN_FLOW_RUN.toString(), null),
           new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+          metricFilterList, null), new TimelineDataToRetrieve(null, null,
+          EnumSet.of(Field.METRICS), null));
       assertEquals(2, entities.size());
       int metricCnt = 0;
       for (TimelineEntity entity : entities) {
@@ -783,8 +783,8 @@ public class TestHBaseStorageFlowRun {
           new TimelineReaderContext(cluster, user, flow, null, null,
           TimelineEntityType.YARN_FLOW_RUN.toString(), null),
           new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList1, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+          metricFilterList1, null), new TimelineDataToRetrieve(null, null,
+          EnumSet.of(Field.METRICS), null));
       assertEquals(1, entities.size());
       metricCnt = 0;
       for (TimelineEntity entity : entities) {
@@ -799,8 +799,8 @@ public class TestHBaseStorageFlowRun {
           new TimelineReaderContext(cluster, user, flow, null, null,
           TimelineEntityType.YARN_FLOW_RUN.toString(), null),
           new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList2, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+          metricFilterList2, null), new TimelineDataToRetrieve(null, null,
+          EnumSet.of(Field.METRICS), null));
       assertEquals(0, entities.size());
 
       TimelineFilterList metricFilterList3 = new TimelineFilterList(
@@ -809,8 +809,8 @@ public class TestHBaseStorageFlowRun {
           new TimelineReaderContext(cluster, user, flow, null, null,
           TimelineEntityType.YARN_FLOW_RUN.toString(), null),
           new TimelineEntityFilters(null, null, null, null, null, null, null,
-          metricFilterList3, null),
-          new TimelineDataToRetrieve(null, null, EnumSet.of(Field.METRICS)));
+          metricFilterList3, null), new TimelineDataToRetrieve(null, null,
+          EnumSet.of(Field.METRICS), null));
       assertEquals(0, entities.size());
 
       TimelineFilterList list3 = new TimelineFilterList();
@@ -832,7 +832,7 @@ public class TestHBaseStorageFlowRun {
           new TimelineEntityFilters(null, null, null, null, null, null, null,
           metricFilterList4, null),
           new TimelineDataToRetrieve(null, metricsToRetrieve,
-          EnumSet.of(Field.ALL)));
+          EnumSet.of(Field.ALL), null));
       assertEquals(2, entities.size());
       metricCnt = 0;
       for (TimelineEntity entity : entities) {

+ 30 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineDataToRetrieve.java

@@ -53,6 +53,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Fiel
  * object to retrieve, see {@link Field}. If null, retrieves 3 fields,
  * namely entity id, entity type and entity created time. All fields will
  * be returned if {@link Field#ALL} is specified.</li>
+ * <li><b>metricsLimit</b> - If fieldsToRetrieve contains METRICS/ALL or
+ * metricsToRetrieve is specified, this limit defines an upper limit to the
+ * number of metrics to return. This parameter is ignored if METRICS are not to
+ * be fetched.</li>
  * </ul>
  */
 @Private
@@ -61,16 +65,28 @@ public class TimelineDataToRetrieve {
   private TimelineFilterList confsToRetrieve;
   private TimelineFilterList metricsToRetrieve;
   private EnumSet<Field> fieldsToRetrieve;
+  private Integer metricsLimit;
+
+  /**
+   * Default limit of number of metrics to return.
+   */
+  public static final Integer DEFAULT_METRICS_LIMIT = 1;
 
   public TimelineDataToRetrieve() {
-    this(null, null, null);
+    this(null, null, null, null);
   }
 
   public TimelineDataToRetrieve(TimelineFilterList confs,
-      TimelineFilterList metrics, EnumSet<Field> fields) {
+      TimelineFilterList metrics, EnumSet<Field> fields,
+      Integer limitForMetrics) {
     this.confsToRetrieve = confs;
     this.metricsToRetrieve = metrics;
     this.fieldsToRetrieve = fields;
+    if (limitForMetrics == null || limitForMetrics < 1) {
+      this.metricsLimit = DEFAULT_METRICS_LIMIT;
+    } else {
+      this.metricsLimit = limitForMetrics;
+    }
 
     if (this.fieldsToRetrieve == null) {
       this.fieldsToRetrieve = EnumSet.noneOf(Field.class);
@@ -116,4 +132,16 @@ public class TimelineDataToRetrieve {
       fieldsToRetrieve.add(Field.METRICS);
     }
   }
+
+  public Integer getMetricsLimit() {
+    return metricsLimit;
+  }
+
+  public void setMetricsLimit(Integer limit) {
+    if (limit == null || limit < 1) {
+      this.metricsLimit = DEFAULT_METRICS_LIMIT;
+    } else {
+      this.metricsLimit = limit;
+    }
+  }
 }

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java

@@ -32,8 +32,9 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyVa
  * filters restrict the number of entities to return.<br>
  * Filters contain the following :<br>
  * <ul>
- * <li><b>limit</b> - A limit on the number of entities to return. If null
- * or {@literal <=0}, defaults to {@link #DEFAULT_LIMIT}.</li>
+ * <li><b>limit</b> - A limit on the number of entities to return. If null or
+ * {@literal < 0}, defaults to {@link #DEFAULT_LIMIT}. The maximum possible
+ * value for limit can be {@link Long#MAX_VALUE}.</li>
  * <li><b>createdTimeBegin</b> - Matched entities should not be created
  * before this timestamp. If null or {@literal <=0}, defaults to 0.</li>
  * <li><b>createdTimeEnd</b> - Matched entities should not be created after

+ 214 - 60
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java

@@ -217,7 +217,10 @@ public class TimelineReaderWebServices {
    *     flowrun id and app id which are extracted from UID and then used to
    *     query backend(Mandatory path param).
    * @param entityType Type of entities(Mandatory path param).
-   * @param limit Number of entities to return(Optional query param).
+   * @param limit If specified, defines the number of entities to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched entities should not be
    *     created before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched entities should not be created
@@ -254,6 +257,13 @@ public class TimelineReaderWebServices {
    *     {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type, id and created time is returned
    *     (Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances of the given entity type
@@ -283,7 +293,8 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -308,7 +319,7 @@ public class TimelineReaderWebServices {
           limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
           infofilters, conffilters, metricfilters, eventfilters),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          confsToRetrieve, metricsToRetrieve, fields));
+          confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
       handleException(e, url, startTime,
           "createdTime start/end or limit or flowrunid");
@@ -342,7 +353,10 @@ public class TimelineReaderWebServices {
    *     query param).
    * @param flowRunId Run id which should match for the entities(Optional query
    *     param).
-   * @param limit Number of entities to return(Optional query param).
+   * @param limit If specified, defines the number of entities to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched entities should not be
    *     created before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched entities should not be created
@@ -379,6 +393,13 @@ public class TimelineReaderWebServices {
    *     {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type, id, created time is returned
    *     (Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances of the given entity type
@@ -413,11 +434,12 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     return getEntities(req, res, null, appId, entityType, userId, flowName,
         flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
         isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
   }
 
   /**
@@ -441,7 +463,10 @@ public class TimelineReaderWebServices {
    *     query param).
    * @param flowRunId Run id which should match for the entities(Optional query
    *     param).
-   * @param limit Number of entities to return(Optional query param).
+   * @param limit If specified, defines the number of entities to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched entities should not be
    *     created before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched entities should not be created
@@ -478,6 +503,13 @@ public class TimelineReaderWebServices {
    *     {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type, id, created time is returned
    *     (Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances of the given entity type
@@ -513,7 +545,8 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -533,7 +566,7 @@ public class TimelineReaderWebServices {
           limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
           infofilters, conffilters, metricfilters, eventfilters),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          confsToRetrieve, metricsToRetrieve, fields));
+          confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
       handleException(e, url, startTime,
           "createdTime start/end or limit or flowrunid");
@@ -568,6 +601,13 @@ public class TimelineReaderWebServices {
    *     {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type, id, created time is returned
    *     (Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     <cite>TimelineEntity</cite> instance is returned.<br>
@@ -588,7 +628,8 @@ public class TimelineReaderWebServices {
       @PathParam("uid") String uId,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -608,7 +649,7 @@ public class TimelineReaderWebServices {
       }
       entity = timelineReaderManager.getEntity(context,
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          confsToRetrieve, metricsToRetrieve, fields));
+          confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
       handleException(e, url, startTime, "flowrunid");
     }
@@ -655,6 +696,13 @@ public class TimelineReaderWebServices {
    *     {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type, id, created time is returned
    *     (Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     <cite>TimelineEntity</cite> instance is returned.<br>
@@ -680,9 +728,11 @@ public class TimelineReaderWebServices {
       @QueryParam("flowrunid") String flowRunId,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     return getEntity(req, res, null, appId, entityType, entityId, userId,
-        flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields);
+        flowName, flowRunId, confsToRetrieve, metricsToRetrieve, fields,
+        metricsLimit);
   }
 
   /**
@@ -717,6 +767,13 @@ public class TimelineReaderWebServices {
    *     {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type, id and created time is returned
    *     (Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     <cite>TimelineEntity</cite> instance is returned.<br>
@@ -743,7 +800,8 @@ public class TimelineReaderWebServices {
       @QueryParam("flowrunid") String flowRunId,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -760,7 +818,7 @@ public class TimelineReaderWebServices {
           TimelineReaderWebServicesUtils.createTimelineReaderContext(
           clusterId, userId, flowName, flowRunId, appId, entityType, entityId),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          confsToRetrieve, metricsToRetrieve, fields));
+          confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
       handleException(e, url, startTime, "flowrunid");
     }
@@ -827,7 +885,7 @@ public class TimelineReaderWebServices {
       context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString());
       entity = timelineReaderManager.getEntity(context,
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          null, metricsToRetrieve, null));
+          null, metricsToRetrieve, null, null));
     } catch (Exception e) {
       handleException(e, url, startTime, "flowrunid");
     }
@@ -936,7 +994,7 @@ public class TimelineReaderWebServices {
           clusterId, userId, flowName, flowRunId, null,
           TimelineEntityType.YARN_FLOW_RUN.toString(), null),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          null, metricsToRetrieve, null));
+          null, metricsToRetrieve, null, null));
     } catch (Exception e) {
       handleException(e, url, startTime, "flowrunid");
     }
@@ -963,7 +1021,10 @@ public class TimelineReaderWebServices {
    * @param uId a delimited string containing clusterid, userid, and flow name
    *     which are extracted from UID and then used to query backend(Mandatory
    *     path param).
-   * @param limit Number of flow runs to return(Optional query param).
+   * @param limit If specified, defines the number of flow runs to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched flow runs should not be
    *     created before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched flow runs should not be created
@@ -972,10 +1033,11 @@ public class TimelineReaderWebServices {
    *     and send back in response. These metrics will be retrieved
    *     irrespective of whether metrics are specified in fields to retrieve or
    *     not.
-   * @param fields Specifies which fields to retrieve, see {@link Field}.
-   *     All fields will be retrieved if fields=ALL. Fields other than METRICS
-   *     have no meaning for this REST endpoint. If not specified, all fields
-   *     other than metrics are returned(Optional query param).
+   * @param fields Specifies which fields to retrieve, see {@link Field}. All
+   *     fields will be retrieved if fields=ALL. Amongst all the fields, only
+   *     METRICS makes sense for flow runs hence only ALL or METRICS are
+   *     supported as fields for fetching flow runs. Other fields will lead to
+   *     HTTP 400 (Bad Request) response. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1021,7 +1083,7 @@ public class TimelineReaderWebServices {
           limit, createdTimeStart, createdTimeEnd, null, null, null,
           null, null, null),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          null, metricsToRetrieve, fields));
+          null, metricsToRetrieve, fields, null));
     } catch (Exception e) {
       handleException(e, url, startTime, "createdTime start/end or limit");
     }
@@ -1044,7 +1106,10 @@ public class TimelineReaderWebServices {
    *     Mandatory path param)
    * @param flowName Flow name to which the flow runs to be queried belongs to(
    *     Mandatory path param).
-   * @param limit Number of flow runs to return(Optional query param).
+   * @param limit If specified, defines the number of flow runs to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched flow runs should not be
    *     created before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched flow runs should not be created
@@ -1053,10 +1118,11 @@ public class TimelineReaderWebServices {
    *     and send back in response. These metrics will be retrieved
    *     irrespective of whether metrics are specified in fields to retrieve or
    *     not.
-   * @param fields Specifies which fields to retrieve, see {@link Field}.
-   *     All fields will be retrieved if fields=ALL. Fields other than METRICS
-   *     have no meaning for this REST endpoint. If not specified, all fields
-   *     other than metrics are returned(Optional query param).
+   * @param fields Specifies which fields to retrieve, see {@link Field}. All
+   *     fields will be retrieved if fields=ALL. Amongst all the fields, only
+   *     METRICS makes sense for flow runs hence only ALL or METRICS are
+   *     supported as fields for fetching flow runs. Other fields will lead to
+   *     HTTP 400 (Bad Request) response. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1095,7 +1161,10 @@ public class TimelineReaderWebServices {
    *     Mandatory path param)
    * @param flowName Flow name to which the flow runs to be queried belongs to(
    *     Mandatory path param).
-   * @param limit Number of flow runs to return(Optional query param).
+   * @param limit If specified, defines the number of flow runs to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched flow runs should not be
    *     created before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched flow runs should not be created
@@ -1104,10 +1173,11 @@ public class TimelineReaderWebServices {
    *     and send back in response. These metrics will be retrieved
    *     irrespective of whether metrics are specified in fields to retrieve or
    *     not.
-   * @param fields Specifies which fields to retrieve, see {@link Field}.
-   *     All fields will be retrieved if fields=ALL. Fields other than METRICS
-   *     have no meaning for this REST endpoint. If not specified, all fields
-   *     other than metrics are returned(Optional query param).
+   * @param fields Specifies which fields to retrieve, see {@link Field}. All
+   *     fields will be retrieved if fields=ALL. Amongst all the fields, only
+   *     METRICS makes sense for flow runs hence only ALL or METRICS are
+   *     supported as fields for fetching flow runs. Other fields will lead to
+   *     HTTP 400 (Bad Request) response. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1152,7 +1222,7 @@ public class TimelineReaderWebServices {
           limit, createdTimeStart, createdTimeEnd, null, null, null,
           null, null, null),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          null, metricsToRetrieve, fields));
+          null, metricsToRetrieve, fields, null));
     } catch (Exception e) {
       handleException(e, url, startTime, "createdTime start/end or limit");
     }
@@ -1171,7 +1241,10 @@ public class TimelineReaderWebServices {
    *
    * @param req Servlet request.
    * @param res Servlet response.
-   * @param limit Number of flows to return(Optional query param).
+   * @param limit If specified, defines the number of flows to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param dateRange If specified is given as "[startdate]-[enddate]"(i.e.
    *     start and end date separated by "-") or single date. Dates are
    *     interpreted in yyyyMMdd format and are assumed to be in GMT(Optional
@@ -1214,7 +1287,10 @@ public class TimelineReaderWebServices {
    * @param res Servlet response.
    * @param clusterId Cluster id to which the flows to be queried belong to(
    *     Mandatory path param).
-   * @param limit Number of flows to return(Optional query param).
+   * @param limit If specified, defines the number of flows to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param dateRange If specified is given as "[startdate]-[enddate]"(i.e.
    *     start and end date separated by "-") or single date. Dates are
    *     interpreted in yyyyMMdd format and are assumed to be in GMT(Optional
@@ -1271,7 +1347,7 @@ public class TimelineReaderWebServices {
           clusterId, null, null, null, null,
           TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null),
           entityFilters, TimelineReaderWebServicesUtils.
-          createTimelineDataToRetrieve(null, null, null));
+          createTimelineDataToRetrieve(null, null, null, null));
     } catch (Exception e) {
       handleException(e, url, startTime, "limit");
     }
@@ -1305,6 +1381,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     <cite>TimelineEntity</cite> instance is returned.<br>
@@ -1325,7 +1408,8 @@ public class TimelineReaderWebServices {
       @PathParam("uid") String uId,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -1346,7 +1430,7 @@ public class TimelineReaderWebServices {
       context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
       entity = timelineReaderManager.getEntity(context,
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          confsToRetrieve, metricsToRetrieve, fields));
+          confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
       handleException(e, url, startTime, "flowrunid");
     }
@@ -1388,6 +1472,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     <cite>TimelineEntity</cite> instance is returned.<br>
@@ -1411,9 +1502,10 @@ public class TimelineReaderWebServices {
       @QueryParam("userid") String userId,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     return getApp(req, res, null, appId, flowName, flowRunId, userId,
-        confsToRetrieve, metricsToRetrieve, fields);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
   }
 
   /**
@@ -1444,6 +1536,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     <cite>TimelineEntity</cite> instance is returned.<br>
@@ -1468,7 +1567,8 @@ public class TimelineReaderWebServices {
       @QueryParam("userid") String userId,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -1486,7 +1586,7 @@ public class TimelineReaderWebServices {
           clusterId, userId, flowName, flowRunId, appId,
           TimelineEntityType.YARN_APPLICATION.toString(), null),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          confsToRetrieve, metricsToRetrieve, fields));
+          confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
       handleException(e, url, startTime, "flowrunid");
     }
@@ -1512,7 +1612,10 @@ public class TimelineReaderWebServices {
    * @param uId a delimited string containing clusterid, userid, flow name and
    *     flowrun id which are extracted from UID and then used to query backend.
    *     (Mandatory path param).
-   * @param limit Number of apps to return(Optional query param).
+   * @param limit If specified, defines the number of apps to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched apps should not be created
    *     before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched apps should not be created
@@ -1549,6 +1652,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1577,7 +1687,8 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -1601,7 +1712,7 @@ public class TimelineReaderWebServices {
           limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
           infofilters, conffilters, metricfilters, eventfilters),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
-          confsToRetrieve, metricsToRetrieve, fields));
+          confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
       handleException(e, url, startTime,
           "createdTime start/end or limit or flowrunid");
@@ -1628,7 +1739,10 @@ public class TimelineReaderWebServices {
    *     param).
    * @param flowRunId Run id which should match for the apps(Mandatory path
    *     param).
-   * @param limit Number of apps to return(Optional query param).
+   * @param limit If specified, defines the number of apps to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched apps should not be created
    *     before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched apps should not be created
@@ -1665,6 +1779,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1695,12 +1816,13 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     return getEntities(req, res, null, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
         isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
   }
 
   /**
@@ -1717,7 +1839,10 @@ public class TimelineReaderWebServices {
    *     param).
    * @param flowRunId Run id which should match for the apps(Mandatory path
    *     param).
-   * @param limit Number of apps to return(Optional query param).
+   * @param limit If specified, defines the number of apps to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched apps should not be created
    *     before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched apps should not be created
@@ -1754,6 +1879,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1786,12 +1918,13 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     return getEntities(req, res, clusterId, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
         isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
   }
 
   /**
@@ -1805,7 +1938,10 @@ public class TimelineReaderWebServices {
    * @param userId User id which should match for the apps(Mandatory path param)
    * @param flowName Flow name which should match for the apps(Mandatory path
    *     param).
-   * @param limit Number of apps to return(Optional query param).
+   * @param limit If specified, defines the number of apps to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched apps should not be created
    *     before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched apps should not be created
@@ -1842,6 +1978,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1871,19 +2014,19 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     return getEntities(req, res, null, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
         infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
   }
 
   /**
    * Return a list of apps for a given user, cluster id and flow name. If number
    * of matching apps are more than the limit, most recent apps till the limit
-   * is reached, will be returned. If number of matching apps are more than the
-   * limit, most recent apps till the limit is reached, will be returned.
+   * is reached, will be returned.
    *
    * @param req Servlet request.
    * @param res Servlet response.
@@ -1892,7 +2035,10 @@ public class TimelineReaderWebServices {
    * @param userId User id which should match for the apps(Mandatory path param)
    * @param flowName Flow name which should match for the apps(Mandatory path
    *     param).
-   * @param limit Number of apps to return(Optional query param).
+   * @param limit If specified, defines the number of apps to return. The
+   *     maximum possible value for limit can be {@link Long#MAX_VALUE}. If it
+   *     is not specified or has a value less than 0, then limit will be
+   *     considered as 100. (Optional query param).
    * @param createdTimeStart If specified, matched apps should not be created
    *     before this timestamp(Optional query param).
    * @param createdTimeEnd If specified, matched apps should not be created
@@ -1929,6 +2075,13 @@ public class TimelineReaderWebServices {
    *     see {@link Field}. All fields will be retrieved if fields=ALL. If not
    *     specified, 3 fields i.e. entity type(equivalent to YARN_APPLICATION),
    *     app id and app created time is returned(Optional query param).
+   * @param metricsLimit If specified, defines the number of metrics to return.
+   *     Considered only if fields contains METRICS/ALL or metricsToRetrieve is
+   *     specified. Ignored otherwise. The maximum possible value for
+   *     metricsLimit can be {@link Integer#MAX_VALUE}. If it is not specified
+   *     or has a value less than 1, and metrics have to be retrieved, then
+   *     metricsLimit will be considered as 1 i.e. latest single value of
+   *     metric(s) will be returned. (Optional query param).
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1959,11 +2112,12 @@ public class TimelineReaderWebServices {
       @QueryParam("eventfilters") String eventfilters,
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("metricslimit") String metricsLimit) {
     return getEntities(req, res, clusterId, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
         infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit);
   }
 }

+ 14 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServicesUtils.java

@@ -87,14 +87,16 @@ final class TimelineReaderWebServicesUtils {
    * @param confs confs to retrieve.
    * @param metrics metrics to retrieve.
    * @param fields fields to retrieve.
+   * @param metricsLimit upper limit on number of metrics to return.
    * @return a {@link TimelineDataToRetrieve} object.
    * @throws TimelineParseException if any problem occurs during parsing.
    */
   static TimelineDataToRetrieve createTimelineDataToRetrieve(String confs,
-      String metrics, String fields) throws TimelineParseException {
+      String metrics, String fields, String metricsLimit)
+      throws TimelineParseException {
     return new TimelineDataToRetrieve(parseDataToRetrieve(confs),
-        parseDataToRetrieve(metrics), parseFieldsStr(
-            fields, TimelineParseConstants.COMMA_DELIMITER));
+        parseDataToRetrieve(metrics), parseFieldsStr(fields,
+        TimelineParseConstants.COMMA_DELIMITER), parseIntStr(metricsLimit));
   }
 
   /**
@@ -190,6 +192,15 @@ final class TimelineReaderWebServicesUtils {
     return str == null ? null : Long.parseLong(str.trim());
   }
 
+  /**
+   * Interpret passed string as a integer.
+   * @param str Passed string.
+   * @return integer representation if string is not null, null otherwise.
+   */
+  static Integer parseIntStr(String str) {
+    return str == null ? null : Integer.parseInt(str.trim());
+  }
+
   /**
    * Trims the passed string if its not null.
    * @param str Passed string.

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java

@@ -312,7 +312,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         ApplicationRowKey.getRowKey(context.getClusterId(), context.getUserId(),
             context.getFlowName(), context.getFlowRunId(), context.getAppId());
     Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
+    get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
     if (filterList != null && !filterList.getFilters().isEmpty()) {
       get.setFilter(filterList);
     }
@@ -382,6 +382,7 @@ class ApplicationEntityReader extends GenericEntityReader {
       newList.addFilter(filterList);
     }
     scan.setFilter(newList);
+    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
     return getTable().getResultScanner(hbaseConf, conn, scan);
   }
 

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
 
 import java.io.IOException;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Connection;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnF
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
 
 import com.google.common.base.Preconditions;
 
@@ -91,6 +93,15 @@ class FlowRunEntityReader extends TimelineEntityReader {
       Preconditions.checkNotNull(getContext().getFlowRunId(),
           "flowRunId shouldn't be null");
     }
+    EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve();
+    if (!isSingleEntityRead() && fieldsToRetrieve != null) {
+      for (Field field : fieldsToRetrieve) {
+        if (field != Field.ALL && field != Field.METRICS) {
+          throw new BadRequestException("Invalid field " + field +
+              " specified while querying flow runs.");
+        }
+      }
+    }
   }
 
   @Override
@@ -209,6 +220,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
       newList.addFilter(filterList);
     }
     scan.setFilter(newList);
+    scan.setMaxVersions(Integer.MAX_VALUE);
     return getTable().getResultScanner(hbaseConf, conn, scan);
   }
 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java

@@ -489,7 +489,7 @@ class GenericEntityReader extends TimelineEntityReader {
             context.getFlowName(), context.getFlowRunId(), context.getAppId(),
             context.getEntityType(), context.getEntityId());
     Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
+    get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
     if (filterList != null && !filterList.getFilters().isEmpty()) {
       get.setFilter(filterList);
     }
@@ -506,7 +506,7 @@ class GenericEntityReader extends TimelineEntityReader {
     scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
         context.getClusterId(), context.getUserId(), context.getFlowName(),
         context.getFlowRunId(), context.getAppId(), context.getEntityType()));
-    scan.setMaxVersions(Integer.MAX_VALUE);
+    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
     if (filterList != null && !filterList.getFilters().isEmpty()) {
       scan.setFilter(filterList);
     }

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java

@@ -338,8 +338,9 @@ public abstract class TimelineEntityReader {
       metric.setId(metricResult.getKey());
       // Simply assume that if the value set contains more than 1 elements, the
       // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
-      metric.setType(metricResult.getValue().size() > 1 ?
-          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+      TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ?
+          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE;
+      metric.setType(metricType);
       metric.addValues(metricResult.getValue());
       entity.addMetric(metric);
     }

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java

@@ -274,7 +274,7 @@ public class TestFileSystemTimelineReaderImpl {
     TimelineEntity result = reader.getEntity(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", "id_1"),
-        new TimelineDataToRetrieve(null, null, null));
+        new TimelineDataToRetrieve(null, null, null, null));
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
         result.getIdentifier().toString());
@@ -289,7 +289,7 @@ public class TestFileSystemTimelineReaderImpl {
     TimelineEntity result = reader.getEntity(
         new TimelineReaderContext("cluster1", null, null, null, "app1", "app",
         "id_1"),
-        new TimelineDataToRetrieve(null, null, null));
+        new TimelineDataToRetrieve(null, null, null, null));
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
         result.getIdentifier().toString());
@@ -306,7 +306,7 @@ public class TestFileSystemTimelineReaderImpl {
     TimelineEntity result = reader.getEntity(
         new TimelineReaderContext("cluster1", null, null, null, "app2",
         "app", "id_5"),
-        new TimelineDataToRetrieve(null, null, null));
+        new TimelineDataToRetrieve(null, null, null, null));
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_5")).toString(),
         result.getIdentifier().toString());
@@ -320,7 +320,7 @@ public class TestFileSystemTimelineReaderImpl {
         new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1",
         "app", "id_1"),
         new TimelineDataToRetrieve(null, null,
-        EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS)));
+        EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null));
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
         result.getIdentifier().toString());
@@ -338,7 +338,7 @@ public class TestFileSystemTimelineReaderImpl {
     TimelineEntity result = reader.getEntity(
         new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1",
         "app", "id_1"),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     Assert.assertEquals(
         (new TimelineEntity.Identifier("app", "id_1")).toString(),
         result.getIdentifier().toString());
@@ -354,7 +354,7 @@ public class TestFileSystemTimelineReaderImpl {
     Set<TimelineEntity> result = reader.getEntities(
         new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
         "app", null), new TimelineEntityFilters(),
-        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL)));
+        new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
     // All 4 entities will be returned
     Assert.assertEquals(4, result.size());
   }