Browse Source

YARN-6064. Support fromId for flowRuns and flow/flowRun apps REST API's (Rohith Sharma K S via Varun Saxena)

(cherry picked from commit 4b1ba4ea314147f8a06cc4f446c1d9336de89fc1)
Varun Saxena 8 years ago
parent
commit
aa09001a33

+ 211 - 45
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java

@@ -40,6 +40,8 @@ import javax.ws.rs.core.MediaType;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -352,6 +355,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
           flowVersion2, runid2, entity3.getId(), te3);
       hbi.write(cluster, user, flow, flowVersion, runid,
           "application_1111111111_1111", userEntities);
+      writeApplicationEntities(hbi);
       hbi.flush();
     } finally {
       if (hbi != null) {
@@ -360,6 +364,35 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     }
   }
 
+  static void writeApplicationEntities(HBaseTimelineWriterImpl hbi)
+      throws IOException {
+    long currentTimeMillis = System.currentTimeMillis();
+    int count = 1;
+    for (long i = 1; i <= 3; i++) {
+      for (int j = 1; j <= 5; j++) {
+        TimelineEntities te = new TimelineEntities();
+        ApplicationId appId =
+            BuilderUtils.newApplicationId(currentTimeMillis, count++);
+        ApplicationEntity appEntity = new ApplicationEntity();
+        appEntity.setId(appId.toString());
+        appEntity.setCreatedTime(currentTimeMillis);
+
+        TimelineEvent created = new TimelineEvent();
+        created.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+        created.setTimestamp(currentTimeMillis);
+        appEntity.addEvent(created);
+        TimelineEvent finished = new TimelineEvent();
+        finished.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+        finished.setTimestamp(currentTimeMillis + i * j);
+
+        appEntity.addEvent(finished);
+        te.addEntity(appEntity);
+        hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i,
+            appEntity.getId(), te);
+      }
+    }
+  }
+
   @AfterClass
   public static void tearDown() throws Exception {
     util.shutdownMiniCluster();
@@ -697,7 +730,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       Set<FlowActivityEntity> flowEntities =
           resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
       assertNotNull(flowEntities);
-      assertEquals(2, flowEntities.size());
+      assertEquals(3, flowEntities.size());
       List<String> listFlowUIDs = new ArrayList<String>();
       for (FlowActivityEntity entity : flowEntities) {
         String flowUID =
@@ -709,7 +742,9 @@ public class TestTimelineReaderWebServicesHBaseStorage {
         assertTrue((entity.getId().endsWith("@flow_name") &&
             entity.getFlowRuns().size() == 2) ||
             (entity.getId().endsWith("@flow_name2") &&
-            entity.getFlowRuns().size() == 1));
+                entity.getFlowRuns().size() == 1)
+            || (entity.getId().endsWith("@flow1")
+                && entity.getFlowRuns().size() == 3));
       }
 
       // Query flowruns based on UID returned in query above.
@@ -731,7 +766,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
               flowRunUID);
         }
       }
-      assertEquals(3, listFlowRunUIDs.size());
+      assertEquals(6, listFlowRunUIDs.size());
 
       // Query single flowrun based on UIDs' returned in query to get flowruns.
       for (String flowRunUID : listFlowRunUIDs) {
@@ -763,7 +798,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
               context.getFlowRunId(), entity.getId(), null, null)), appUID);
         }
       }
-      assertEquals(4, listAppUIDs.size());
+      assertEquals(19, listAppUIDs.size());
 
       // Query single app based on UIDs' returned in query to get apps.
       for (String appUID : listAppUIDs) {
@@ -944,32 +979,20 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/clusters/cluster1/flows");
-      ClientResponse resp = getResponse(client, uri);
-      Set<FlowActivityEntity> entities =
-          resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(2, entities.size());
-      for (FlowActivityEntity entity : entities) {
-        assertTrue((entity.getId().endsWith("@flow_name") &&
-            entity.getFlowRuns().size() == 2) ||
-            (entity.getId().endsWith("@flow_name2") &&
-            entity.getFlowRuns().size() == 1));
-      }
+
+      verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+          new String[] {"flow1", "flow_name", "flow_name2"});
 
       // Query without specifying cluster ID.
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/flows/");
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(2, entities.size());
+      verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+          new String[] {"flow1", "flow_name", "flow_name2"});
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
               "timeline/clusters/cluster1/flows?limit=1");
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(1, entities.size());
+      verifyFlowEntites(client, uri, 1, new int[] {3},
+          new String[] {"flow1"});
 
       long firstFlowActivity =
           HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
@@ -979,40 +1002,25 @@ public class TestTimelineReaderWebServicesHBaseStorage {
           "timeline/clusters/cluster1/flows?daterange="
           + fmt.format(firstFlowActivity) + "-"
           + fmt.format(dayTs));
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(2, entities.size());
-      for (FlowActivityEntity entity : entities) {
-        assertTrue((entity.getId().endsWith("@flow_name") &&
-            entity.getFlowRuns().size() == 2) ||
-            (entity.getId().endsWith("@flow_name2") &&
-            entity.getFlowRuns().size() == 1));
-      }
+      verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+          new String[] {"flow1", "flow_name", "flow_name2"});
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/clusters/cluster1/flows?daterange=" +
           fmt.format(dayTs + (4*86400000L)));
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(0, entities.size());
+      verifyFlowEntites(client, uri, 0, new int[] {}, new String[] {});
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/clusters/cluster1/flows?daterange=-" +
           fmt.format(dayTs));
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(2, entities.size());
+      verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+          new String[] {"flow1", "flow_name", "flow_name2"});
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/clusters/cluster1/flows?daterange=" +
            fmt.format(firstFlowActivity) + "-");
-      resp = getResponse(client, uri);
-      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
-      assertNotNull(entities);
-      assertEquals(2, entities.size());
+      verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+          new String[] {"flow1", "flow_name", "flow_name2"});
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/clusters/cluster1/flows?daterange=20150711:20150714");
@@ -2242,4 +2250,162 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     }
     return entity;
   }
+
+  private void verifyFlowEntites(Client client, URI uri, int noOfEntities,
+      int[] a, String[] flowsInSequence) throws Exception {
+    ClientResponse resp = getResponse(client, uri);
+    List<FlowActivityEntity> entities =
+        resp.getEntity(new GenericType<List<FlowActivityEntity>>() {
+        });
+    assertNotNull(entities);
+    assertEquals(noOfEntities, entities.size());
+    assertEquals(noOfEntities, flowsInSequence.length);
+    assertEquals(noOfEntities, a.length);
+    int count = 0;
+    for (FlowActivityEntity timelineEntity : entities) {
+      assertEquals(flowsInSequence[count],
+          timelineEntity.getInfo().get("SYSTEM_INFO_FLOW_NAME"));
+      assertEquals(a[count++], timelineEntity.getFlowRuns().size());
+    }
+  }
+
+  @Test
+  public void testForFlowAppsPagination() throws Exception {
+    Client client = createClient();
+    try {
+      // app entities stored is 15 during initialization.
+      int totalAppEntities = 15;
+      String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+          + "timeline/clusters/cluster1/users/user1/flows/flow1/apps";
+      URI uri = URI.create(resourceUri);
+      ClientResponse resp = getResponse(client, uri);
+      List<TimelineEntity> entities =
+          resp.getEntity(new GenericType<List<TimelineEntity>>() {
+          });
+      assertNotNull(entities);
+      assertEquals(totalAppEntities, entities.size());
+      TimelineEntity entity1 = entities.get(0);
+      TimelineEntity entity15 = entities.get(totalAppEntities - 1);
+
+      int limit = 10;
+      String queryParam = "?limit=" + limit;
+      uri = URI.create(resourceUri + queryParam);
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+      });
+      assertNotNull(entities);
+      assertEquals(limit, entities.size());
+      assertEquals(entity1, entities.get(0));
+      TimelineEntity entity10 = entities.get(limit - 1);
+
+      uri =
+          URI.create(resourceUri + queryParam + "&fromid=" + entity10.getId());
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+      });
+      assertNotNull(entities);
+      assertEquals(6, entities.size());
+      assertEquals(entity10, entities.get(0));
+      assertEquals(entity15, entities.get(5));
+
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testForFlowRunAppsPagination() throws Exception {
+    Client client = createClient();
+    try {
+      // app entities stored is 15 during initialization.
+      int totalAppEntities = 5;
+      String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+          + "timeline/clusters/cluster1/users/user1/flows/flow1/runs/1/apps";
+      URI uri = URI.create(resourceUri);
+      ClientResponse resp = getResponse(client, uri);
+      List<TimelineEntity> entities =
+          resp.getEntity(new GenericType<List<TimelineEntity>>() {
+          });
+      assertNotNull(entities);
+      assertEquals(totalAppEntities, entities.size());
+      TimelineEntity entity1 = entities.get(0);
+      TimelineEntity entity5 = entities.get(totalAppEntities - 1);
+
+      int limit = 3;
+      String queryParam = "?limit=" + limit;
+      uri = URI.create(resourceUri + queryParam);
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+      });
+      assertNotNull(entities);
+      assertEquals(limit, entities.size());
+      assertEquals(entity1, entities.get(0));
+      TimelineEntity entity3 = entities.get(limit - 1);
+
+      uri =
+          URI.create(resourceUri + queryParam + "&fromid=" + entity3.getId());
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+      });
+      assertNotNull(entities);
+      assertEquals(3, entities.size());
+      assertEquals(entity3, entities.get(0));
+      assertEquals(entity5, entities.get(2));
+
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testForFlowRunsPagination() throws Exception {
+    Client client = createClient();
+    try {
+      // app entities stored is 15 during initialization.
+      int totalRuns = 3;
+      String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+          + "timeline/clusters/cluster1/users/user1/flows/flow1/runs";
+      URI uri = URI.create(resourceUri);
+      ClientResponse resp = getResponse(client, uri);
+      List<TimelineEntity> entities =
+          resp.getEntity(new GenericType<List<TimelineEntity>>() {
+          });
+      assertNotNull(entities);
+      assertEquals(totalRuns, entities.size());
+      TimelineEntity entity1 = entities.get(0);
+      TimelineEntity entity3 = entities.get(totalRuns - 1);
+
+      int limit = 2;
+      String queryParam = "?limit=" + limit;
+      uri = URI.create(resourceUri + queryParam);
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+      });
+      assertNotNull(entities);
+      assertEquals(limit, entities.size());
+      assertEquals(entity1, entities.get(0));
+      TimelineEntity entity2 = entities.get(limit - 1);
+
+      uri = URI.create(resourceUri + queryParam + "&fromid="
+          + entity2.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+      });
+      assertNotNull(entities);
+      assertEquals(limit, entities.size());
+      assertEquals(entity2, entities.get(0));
+      assertEquals(entity3, entities.get(1));
+
+      uri = URI.create(resourceUri + queryParam + "&fromid="
+          + entity3.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+      });
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+      assertEquals(entity3, entities.get(0));
+    } finally {
+      client.destroy();
+    }
+  }
 }

+ 53 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java

@@ -1097,6 +1097,9 @@ public class TimelineReaderWebServices {
    *     METRICS makes sense for flow runs hence only ALL or METRICS are
    *     supported as fields for fetching flow runs. Other fields will lead to
    *     HTTP 400 (Bad Request) response. (Optional query param).
+   * @param fromId Defines the flow run id. If specified, retrieve the next
+   *     set of flow runs from the given id. The set of flow runs retrieved
+   *     is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1118,7 +1121,8 @@ public class TimelineReaderWebServices {
       @QueryParam("createdtimestart") String createdTimeStart,
       @QueryParam("createdtimeend") String createdTimeEnd,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("fromid") String fromId) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -1140,11 +1144,12 @@ public class TimelineReaderWebServices {
       entities = timelineReaderManager.getEntities(context,
           TimelineReaderWebServicesUtils.createTimelineEntityFilters(
           limit, createdTimeStart, createdTimeEnd, null, null, null,
-              null, null, null, null, null),
+              null, null, null, null, fromId),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
           null, metricsToRetrieve, fields, null));
     } catch (Exception e) {
-      handleException(e, url, startTime, "createdTime start/end or limit");
+      handleException(e, url, startTime,
+          "createdTime start/end or limit or fromId");
     }
     long endTime = Time.monotonicNow();
     if (entities == null) {
@@ -1182,6 +1187,9 @@ public class TimelineReaderWebServices {
    *     METRICS makes sense for flow runs hence only ALL or METRICS are
    *     supported as fields for fetching flow runs. Other fields will lead to
    *     HTTP 400 (Bad Request) response. (Optional query param).
+   * @param fromId Defines the flow run id. If specified, retrieve the next
+   *     set of flow runs from the given id. The set of flow runs retrieved
+   *     is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1204,9 +1212,10 @@ public class TimelineReaderWebServices {
       @QueryParam("createdtimestart") String createdTimeStart,
       @QueryParam("createdtimeend") String createdTimeEnd,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("fromid") String fromId) {
     return getFlowRuns(req, res, null, userId, flowName, limit,
-        createdTimeStart, createdTimeEnd, metricsToRetrieve, fields);
+        createdTimeStart, createdTimeEnd, metricsToRetrieve, fields, fromId);
   }
 
   /**
@@ -1237,6 +1246,9 @@ public class TimelineReaderWebServices {
    *     METRICS makes sense for flow runs hence only ALL or METRICS are
    *     supported as fields for fetching flow runs. Other fields will lead to
    *     HTTP 400 (Bad Request) response. (Optional query param).
+   * @param fromId Defines the flow run id. If specified, retrieve the next
+   *     set of flow runs from the given id. The set of flow runs retrieved
+   *     is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing a
    *     set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1260,7 +1272,8 @@ public class TimelineReaderWebServices {
       @QueryParam("createdtimestart") String createdTimeStart,
       @QueryParam("createdtimeend") String createdTimeEnd,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
-      @QueryParam("fields") String fields) {
+      @QueryParam("fields") String fields,
+      @QueryParam("fromid") String fromId) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -1279,11 +1292,12 @@ public class TimelineReaderWebServices {
               TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
           TimelineReaderWebServicesUtils.createTimelineEntityFilters(
           limit, createdTimeStart, createdTimeEnd, null, null, null,
-              null, null, null, null, null),
+              null, null, null, null, fromId),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
           null, metricsToRetrieve, fields, null));
     } catch (Exception e) {
-      handleException(e, url, startTime, "createdTime start/end or limit");
+      handleException(e, url, startTime,
+          "createdTime start/end or limit or fromId");
     }
     long endTime = Time.monotonicNow();
     if (entities == null) {
@@ -1719,6 +1733,9 @@ public class TimelineReaderWebServices {
    *     or has a value less than 1, and metrics have to be retrieved, then
    *     metricsLimit will be considered as 1 i.e. latest single value of
    *     metric(s) will be returned. (Optional query param).
+   * @param fromId Defines the application id. If specified, retrieve the next
+   *     set of applications from the given id. The set of applications
+   *     retrieved is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1748,7 +1765,8 @@ public class TimelineReaderWebServices {
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
       @QueryParam("fields") String fields,
-      @QueryParam("metricslimit") String metricsLimit) {
+      @QueryParam("metricslimit") String metricsLimit,
+      @QueryParam("fromid") String fromId) {
     String url = req.getRequestURI() +
         (req.getQueryString() == null ? "" :
             QUERY_STRING_SEP + req.getQueryString());
@@ -1771,7 +1789,7 @@ public class TimelineReaderWebServices {
           TimelineReaderWebServicesUtils.createTimelineEntityFilters(
           limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
               infofilters, conffilters, metricfilters, eventfilters, null,
-              null),
+              fromId),
           TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
           confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
     } catch (Exception e) {
@@ -1847,6 +1865,9 @@ public class TimelineReaderWebServices {
    *     or has a value less than 1, and metrics have to be retrieved, then
    *     metricsLimit will be considered as 1 i.e. latest single value of
    *     metric(s) will be returned. (Optional query param).
+   * @param fromId Defines the application id. If specified, retrieve the next
+   *     set of applications from the given id. The set of applications
+   *     retrieved is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1878,12 +1899,13 @@ public class TimelineReaderWebServices {
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
       @QueryParam("fields") String fields,
-      @QueryParam("metricslimit") String metricsLimit) {
+      @QueryParam("metricslimit") String metricsLimit,
+      @QueryParam("fromid") String fromId) {
     return getEntities(req, res, null, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
         isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
   }
 
   /**
@@ -1947,6 +1969,9 @@ public class TimelineReaderWebServices {
    *     or has a value less than 1, and metrics have to be retrieved, then
    *     metricsLimit will be considered as 1 i.e. latest single value of
    *     metric(s) will be returned. (Optional query param).
+   * @param fromId Defines the application id. If specified, retrieve the next
+   *     set of applications from the given id. The set of applications
+   *     retrieved is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1980,12 +2005,13 @@ public class TimelineReaderWebServices {
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
       @QueryParam("fields") String fields,
-      @QueryParam("metricslimit") String metricsLimit) {
+      @QueryParam("metricslimit") String metricsLimit,
+      @QueryParam("fromid") String fromId) {
     return getEntities(req, res, clusterId, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
         isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
   }
 
   /**
@@ -2046,6 +2072,9 @@ public class TimelineReaderWebServices {
    *     or has a value less than 1, and metrics have to be retrieved, then
    *     metricsLimit will be considered as 1 i.e. latest single value of
    *     metric(s) will be returned. (Optional query param).
+   * @param fromId Defines the application id. If specified, retrieve the next
+   *     set of applications from the given id. The set of applications
+   *     retrieved is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -2076,12 +2105,13 @@ public class TimelineReaderWebServices {
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
       @QueryParam("fields") String fields,
-      @QueryParam("metricslimit") String metricsLimit) {
+      @QueryParam("metricslimit") String metricsLimit,
+      @QueryParam("fromid") String fromId) {
     return getEntities(req, res, null, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
         infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
   }
 
   /**
@@ -2143,6 +2173,9 @@ public class TimelineReaderWebServices {
    *     or has a value less than 1, and metrics have to be retrieved, then
    *     metricsLimit will be considered as 1 i.e. latest single value of
    *     metric(s) will be returned. (Optional query param).
+   * @param fromId Defines the application id. If specified, retrieve the next
+   *     set of applications from the given id. The set of applications
+   *     retrieved is inclusive of specified fromId.
    *
    * @return If successful, a HTTP 200(OK) response having a JSON representing
    *     a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -2174,12 +2207,13 @@ public class TimelineReaderWebServices {
       @QueryParam("confstoretrieve") String confsToRetrieve,
       @QueryParam("metricstoretrieve") String metricsToRetrieve,
       @QueryParam("fields") String fields,
-      @QueryParam("metricslimit") String metricsLimit) {
+      @QueryParam("metricslimit") String metricsLimit,
+      @QueryParam("fromid") String fromId) {
     return getEntities(req, res, clusterId, null,
         TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
         null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
         infofilters, conffilters, metricfilters, eventfilters,
-        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+        confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
   }
 
   /**
@@ -3107,4 +3141,4 @@ public class TimelineReaderWebServices {
         " (Took " + (endTime - startTime) + " ms.)");
     return results;
   }
-}
+}

+ 38 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java

@@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
@@ -359,13 +361,44 @@ class ApplicationEntityReader extends GenericEntityReader {
       Connection conn, FilterList filterList) throws IOException {
     Scan scan = new Scan();
     TimelineReaderContext context = getContext();
+    RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix = null;
+
     // Whether or not flowRunID is null doesn't matter, the
     // ApplicationRowKeyPrefix will do the right thing.
-    RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix =
-        new ApplicationRowKeyPrefix(context.getClusterId(),
-            context.getUserId(), context.getFlowName(),
-            context.getFlowRunId());
-    scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
+    // default mode, will always scans from beginning of entity type.
+    if (getFilters().getFromId() == null) {
+      applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
+          context.getClusterId(), context.getUserId(), context.getFlowName(),
+          context.getFlowRunId());
+      scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
+    } else {
+      Long flowRunId = context.getFlowRunId();
+      if (flowRunId == null) {
+        AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(
+            context.getClusterId(), getFilters().getFromId());
+        FlowContext flowContext =
+            lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
+        flowRunId = flowContext.getFlowRunId();
+      }
+
+      ApplicationRowKey applicationRowKey =
+          new ApplicationRowKey(context.getClusterId(), context.getUserId(),
+              context.getFlowName(), flowRunId, getFilters().getFromId());
+
+      // set start row
+      scan.setStartRow(applicationRowKey.getRowKey());
+
+      // get the bytes for stop row
+      applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
+          context.getClusterId(), context.getUserId(), context.getFlowName(),
+          context.getFlowRunId());
+
+      // set stop row
+      scan.setStopRow(
+          HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+              applicationRowKeyPrefix.getRowKeyPrefix()));
+    }
+
     FilterList newList = new FilterList();
     newList.addFilter(new PageFilter(getFilters().getLimit()));
     if (filterList != null && !filterList.getFilters().isEmpty()) {

+ 25 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
@@ -210,10 +211,30 @@ class FlowRunEntityReader extends TimelineEntityReader {
       FilterList filterList) throws IOException {
     Scan scan = new Scan();
     TimelineReaderContext context = getContext();
-    RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix =
-        new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(),
-            context.getFlowName());
-    scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
+    RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix = null;
+    if (getFilters().getFromId() == null) {
+      flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(),
+          context.getUserId(), context.getFlowName());
+      scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
+    } else {
+
+      FlowRunRowKey flowRunRowKey =
+          new FlowRunRowKey(context.getClusterId(), context.getUserId(),
+              context.getFlowName(), Long.parseLong(getFilters().getFromId()));
+
+      // set start row
+      scan.setStartRow(flowRunRowKey.getRowKey());
+
+      // get the bytes for stop row
+      flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(),
+          context.getUserId(), context.getFlowName());
+
+      // set stop row
+      scan.setStopRow(
+          HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+              flowRunRowKeyPrefix.getRowKeyPrefix()));
+    }
+
     FilterList newList = new FilterList();
     newList.addFilter(new PageFilter(getFilters().getLimit()));
     if (filterList != null && !filterList.getFilters().isEmpty()) {