소스 검색

YARN-4075 [reader REST API] implement support for querying for flows and flow runs (Varun Saxena via vrushali)

Vrushali Channapattan 9 년 전
부모
커밋
4552598da6
10개의 변경된 파일689개의 추가작업 그리고 55개의 파일을 삭제
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
  3. 22 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
  5. 204 39
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
  6. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java
  7. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
  8. 39 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
  9. 365 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
  10. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -109,6 +109,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4074. [timeline reader] implement support for querying for flows
     YARN-4074. [timeline reader] implement support for querying for flows
     and flow runs (sjlee via vrushali)
     and flow runs (sjlee via vrushali)
 
 
+    YARN-4075. [reader REST API] implement support for querying for flows
+    and flow runs (Varun Saxena via vrushali)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java

@@ -154,4 +154,14 @@ public class TimelineMetric {
     }
     }
     return true;
     return true;
   }
   }
+
+  @Override
+  public String toString() {
+    String str = "{id:" + id + ", type:" + type;
+    if (!values.isEmpty()) {
+      str += ", values:" + values;
+    }
+    str += "}";
+    return str;
+  }
 }
 }

+ 22 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java

@@ -25,9 +25,10 @@ import java.util.Set;
 
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 
 
@@ -42,6 +43,22 @@ public class TimelineReaderManager extends AbstractService {
     this.reader = timelineReader;
     this.reader = timelineReader;
   }
   }
 
 
+  /**
+   * Gets cluster ID from config yarn.resourcemanager.cluster-id
+   * if not supplied by client.
+   * @param clusterId
+   * @param conf
+   * @return clusterId
+   */
+  private static String getClusterID(String clusterId, Configuration conf) {
+    if (clusterId == null || clusterId.isEmpty()) {
+      return conf.get(
+          YarnConfiguration.RM_CLUSTER_ID,
+              YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+    }
+    return clusterId;
+  }
+
   /**
   /**
    * Get a set of entities matching given predicates. The meaning of each
    * Get a set of entities matching given predicates. The meaning of each
    * argument has been documented with {@link TimelineReader#getEntities}.
    * argument has been documented with {@link TimelineReader#getEntities}.
@@ -56,7 +73,8 @@ public class TimelineReaderManager extends AbstractService {
       Map<String, Object> infoFilters, Map<String, String> configFilters,
       Map<String, Object> infoFilters, Map<String, String> configFilters,
       Set<String>  metricFilters, Set<String> eventFilters,
       Set<String>  metricFilters, Set<String> eventFilters,
       EnumSet<Field> fieldsToRetrieve) throws IOException {
       EnumSet<Field> fieldsToRetrieve) throws IOException {
-    return reader.getEntities(userId, clusterId, flowId, flowRunId, appId,
+    String cluster = getClusterID(clusterId, getConfig());
+    return reader.getEntities(userId, cluster, flowId, flowRunId, appId,
         entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
         entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
         modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
         modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
         metricFilters, eventFilters, fieldsToRetrieve);
         metricFilters, eventFilters, fieldsToRetrieve);
@@ -71,7 +89,8 @@ public class TimelineReaderManager extends AbstractService {
   public TimelineEntity getEntity(String userId, String clusterId,
   public TimelineEntity getEntity(String userId, String clusterId,
       String flowId, Long flowRunId, String appId, String entityType,
       String flowId, Long flowRunId, String appId, String entityType,
       String entityId, EnumSet<Field> fields) throws IOException {
       String entityId, EnumSet<Field> fields) throws IOException {
-    return reader.getEntity(userId, clusterId, flowId, flowRunId, appId,
+    String cluster = getClusterID(clusterId, getConfig());
+    return reader.getEntity(userId, cluster, flowId, flowRunId, appId,
         entityType, entityId, fields);
         entityType, entityId, fields);
   }
   }
 }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java

@@ -77,6 +77,7 @@ public class TimelineReaderServer extends CompositeService {
     TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
     TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
         YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
         YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
         FileSystemTimelineReaderImpl.class, TimelineReader.class), conf);
         FileSystemTimelineReaderImpl.class, TimelineReader.class), conf);
+    LOG.info("Using store " + readerStore.getClass().getName());
     readerStore.init(conf);
     readerStore.init(conf);
     return readerStore;
     return readerStore;
   }
   }

+ 204 - 39
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.timelineservice.reader;
 package org.apache.hadoop.yarn.server.timelineservice.reader;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
@@ -45,6 +46,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -169,6 +171,11 @@ public class TimelineReaderWebServices {
     return str == null ? null : str.trim();
     return str == null ? null : str.trim();
   }
   }
 
 
+  private static String parseUser(UserGroupInformation callerUGI, String user) {
+    return (callerUGI != null && (user == null || user.isEmpty()) ?
+        callerUGI.getUserName().trim() : parseStr(user));
+  }
+
   private static UserGroupInformation getUser(HttpServletRequest req) {
   private static UserGroupInformation getUser(HttpServletRequest req) {
     String remoteUser = req.getRemoteUser();
     String remoteUser = req.getRemoteUser();
     UserGroupInformation callerUGI = null;
     UserGroupInformation callerUGI = null;
@@ -183,6 +190,17 @@ public class TimelineReaderWebServices {
         ctxt.getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR);
         ctxt.getAttribute(TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR);
   }
   }
 
 
+  private static void handleException(Exception e) throws BadRequestException,
+      WebApplicationException {
+    if (e instanceof IllegalArgumentException) {
+      throw new BadRequestException("Requested Invalid Field.");
+    } else {
+      LOG.error("Error while processing REST request", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
   /**
   /**
    * Return the description of the timeline reader web services.
    * Return the description of the timeline reader web services.
    */
    */
@@ -195,26 +213,59 @@ public class TimelineReaderWebServices {
     return TimelineUtils.createTimelineAbout("Timeline Reader API");
     return TimelineUtils.createTimelineAbout("Timeline Reader API");
   }
   }
 
 
+  /**
+   * Return a set of entities that match the given parameters. Cluster ID is not
+   * provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/entities/{appid}/{entitytype}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
+      @QueryParam("limit") String limit,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+      @QueryParam("relatesto") String relatesTo,
+      @QueryParam("isrelatedto") String isRelatedTo,
+      @QueryParam("infofilters") String infofilters,
+      @QueryParam("conffilters") String conffilters,
+      @QueryParam("metricfilters") String metricfilters,
+      @QueryParam("eventfilters") String eventfilters,
+       @QueryParam("fields") String fields) {
+    return getEntities(req, res, null, appId, entityType, userId, flowId,
+        flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+        modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+        metricfilters, eventfilters, fields);
+  }
+
   /**
   /**
    * Return a set of entities that match the given parameters.
    * Return a set of entities that match the given parameters.
    */
    */
   @GET
   @GET
-  @Path("/entities/{clusterId}/{appId}/{entityType}")
+  @Path("/entities/{clusterid}/{appid}/{entitytype}/")
   @Produces(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
   public Set<TimelineEntity> getEntities(
   public Set<TimelineEntity> getEntities(
       @Context HttpServletRequest req,
       @Context HttpServletRequest req,
       @Context HttpServletResponse res,
       @Context HttpServletResponse res,
-      @PathParam("clusterId") String clusterId,
-      @PathParam("appId") String appId,
-      @PathParam("entityType") String entityType,
-      @QueryParam("userId") String userId,
-      @QueryParam("flowId") String flowId,
-      @QueryParam("flowRunId") String flowRunId,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
       @QueryParam("limit") String limit,
       @QueryParam("limit") String limit,
-      @QueryParam("createdTimeStart") String createdTimeStart,
-      @QueryParam("createdTimeEnd") String createdTimeEnd,
-      @QueryParam("modifiedTimeStart") String modifiedTimeStart,
-      @QueryParam("modifiedTimeEnd") String modifiedTimeEnd,
+      @QueryParam("createdtimestart") String createdTimeStart,
+      @QueryParam("createdtimeend") String createdTimeEnd,
+      @QueryParam("modifiedtimestart") String modifiedTimeStart,
+      @QueryParam("modifiedtimeend") String modifiedTimeEnd,
       @QueryParam("relatesto") String relatesTo,
       @QueryParam("relatesto") String relatesTo,
       @QueryParam("isrelatedto") String isRelatedTo,
       @QueryParam("isrelatedto") String isRelatedTo,
       @QueryParam("infofilters") String infofilters,
       @QueryParam("infofilters") String infofilters,
@@ -225,11 +276,10 @@ public class TimelineReaderWebServices {
     init(res);
     init(res);
     TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
     TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
     UserGroupInformation callerUGI = getUser(req);
     UserGroupInformation callerUGI = getUser(req);
+    Set<TimelineEntity> entities = null;
     try {
     try {
-      return timelineReaderManager.getEntities(
-          callerUGI != null && (userId == null || userId.isEmpty()) ?
-          callerUGI.getUserName().trim() : parseStr(userId),
-          parseStr(clusterId), parseStr(flowId),
+      entities = timelineReaderManager.getEntities(
+          parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
           parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
           parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
           parseLongStr(limit), parseLongStr(createdTimeStart),
           parseLongStr(limit), parseLongStr(createdTimeStart),
           parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
           parseLongStr(createdTimeEnd), parseLongStr(modifiedTimeStart),
@@ -245,31 +295,52 @@ public class TimelineReaderWebServices {
       throw new BadRequestException(
       throw new BadRequestException(
           "createdTime or modifiedTime start/end or limit or flowId is not" +
           "createdTime or modifiedTime start/end or limit or flowId is not" +
           " a numeric value.");
           " a numeric value.");
-    } catch (IllegalArgumentException e) {
-      throw new BadRequestException("Requested Invalid Field.");
     } catch (Exception e) {
     } catch (Exception e) {
-      LOG.error("Error getting entities", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
+      handleException(e);
+    }
+    if (entities == null) {
+      entities = Collections.emptySet();
     }
     }
+    return entities;
+  }
+
+  /**
+   * Return a single entity of the given entity type and Id. Cluster ID is not
+   * provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/entity/{appid}/{entitytype}/{entityid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getEntity(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @PathParam("entityid") String entityId,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
+      @QueryParam("fields") String fields) {
+    return getEntity(req, res, null, appId, entityType, entityId, userId,
+        flowId, flowRunId, fields);
   }
   }
 
 
   /**
   /**
    * Return a single entity of the given entity type and Id.
    * Return a single entity of the given entity type and Id.
    */
    */
   @GET
   @GET
-  @Path("/entity/{clusterId}/{appId}/{entityType}/{entityId}/")
+  @Path("/entity/{clusterid}/{appid}/{entitytype}/{entityid}/")
   @Produces(MediaType.APPLICATION_JSON)
   @Produces(MediaType.APPLICATION_JSON)
   public TimelineEntity getEntity(
   public TimelineEntity getEntity(
       @Context HttpServletRequest req,
       @Context HttpServletRequest req,
       @Context HttpServletResponse res,
       @Context HttpServletResponse res,
-      @PathParam("clusterId") String clusterId,
-      @PathParam("appId") String appId,
-      @PathParam("entityType") String entityType,
-      @PathParam("entityId") String entityId,
-      @QueryParam("userId") String userId,
-      @QueryParam("flowId") String flowId,
-      @QueryParam("flowRunId") String flowRunId,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("appid") String appId,
+      @PathParam("entitytype") String entityType,
+      @PathParam("entityid") String entityId,
+      @QueryParam("userid") String userId,
+      @QueryParam("flowid") String flowId,
+      @QueryParam("flowrunid") String flowRunId,
       @QueryParam("fields") String fields) {
       @QueryParam("fields") String fields) {
     init(res);
     init(res);
     TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
     TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
@@ -277,19 +348,13 @@ public class TimelineReaderWebServices {
     TimelineEntity entity = null;
     TimelineEntity entity = null;
     try {
     try {
       entity = timelineReaderManager.getEntity(
       entity = timelineReaderManager.getEntity(
-          callerUGI != null && (userId == null || userId.isEmpty()) ?
-          callerUGI.getUserName().trim() : parseStr(userId),
-          parseStr(clusterId), parseStr(flowId), parseLongStr(flowRunId),
-          parseStr(appId), parseStr(entityType), parseStr(entityId),
-          parseFieldsStr(fields, COMMA_DELIMITER));
+          parseUser(callerUGI, userId), parseStr(clusterId), parseStr(flowId),
+          parseLongStr(flowRunId), parseStr(appId), parseStr(entityType),
+          parseStr(entityId), parseFieldsStr(fields, COMMA_DELIMITER));
     } catch (NumberFormatException e) {
     } catch (NumberFormatException e) {
-      throw new BadRequestException("flowRunId is not a numeric value.");
-    } catch (IllegalArgumentException e) {
-      throw new BadRequestException("Requested Invalid Field.");
+      throw new BadRequestException("flowrunid is not a numeric value.");
     } catch (Exception e) {
     } catch (Exception e) {
-      LOG.error("Error getting entity", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
+      handleException(e);
     }
     }
     if (entity == null) {
     if (entity == null) {
       throw new NotFoundException("Timeline entity {id: " + parseStr(entityId) +
       throw new NotFoundException("Timeline entity {id: " + parseStr(entityId) +
@@ -297,4 +362,104 @@ public class TimelineReaderWebServices {
     }
     }
     return entity;
     return entity;
   }
   }
+
+  /**
+   * Return a single flow run for the given cluster, flow id and run id.
+   * Cluster ID is not provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/flowrun/{flowid}/{flowrunid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getFlowRun(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("flowid") String flowId,
+      @PathParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("fields") String fields) {
+    return getFlowRun(req, res, null, flowId, flowRunId, userId, fields);
+  }
+
+  /**
+   * Return a single flow run for the given cluster, flow id and run id.
+   */
+  @GET
+  @Path("/flowrun/{clusterid}/{flowid}/{flowrunid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public TimelineEntity getFlowRun(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @PathParam("flowid") String flowId,
+      @PathParam("flowrunid") String flowRunId,
+      @QueryParam("userid") String userId,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    UserGroupInformation callerUGI = getUser(req);
+    TimelineEntity entity = null;
+    try {
+      entity = timelineReaderManager.getEntity(
+          parseUser(callerUGI, userId), parseStr(clusterId),
+          parseStr(flowId), parseLongStr(flowRunId), null,
+          TimelineEntityType.YARN_FLOW_RUN.toString(), null,
+          parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException("flowRunId is not a numeric value.");
+    } catch (Exception e) {
+      handleException(e);
+    }
+    if (entity == null) {
+      throw new NotFoundException("Flow run {flow id: " + parseStr(flowId) +
+          ", run id: " + parseLongStr(flowRunId) + " } is not found");
+    }
+    return entity;
+  }
+
+  /**
+   * Return a list of flows for a given cluster id. Cluster ID is not
+   * provided by client so default cluster ID has to be taken.
+   */
+  @GET
+  @Path("/flows/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlows(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @QueryParam("limit") String limit,
+      @QueryParam("fields") String fields) {
+    return getFlows(req, res, null, limit, fields);
+  }
+
+  /**
+   * Return a list of flows for a given cluster id.
+   */
+  @GET
+  @Path("/flows/{clusterid}/")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Set<TimelineEntity> getFlows(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("clusterid") String clusterId,
+      @QueryParam("limit") String limit,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+    Set<TimelineEntity> entities = null;
+    try {
+      entities = timelineReaderManager.getEntities(
+          null, parseStr(clusterId), null, null, null,
+          TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), parseLongStr(limit),
+          null, null, null, null, null, null, null, null, null, null,
+          parseFieldsStr(fields, COMMA_DELIMITER));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException("limit is not a numeric value.");
+    } catch (Exception e) {
+      handleException(e);
+    }
+    if (entities == null) {
+      entities = Collections.emptySet();
+    }
+    return entities;
+  }
 }
 }

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/package-info.java

@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java

@@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
 /**
  * This creates the schema for a hbase based backend for storing application
  * This creates the schema for a hbase based backend for storing application
  * timeline information.
  * timeline information.
@@ -201,6 +203,7 @@ public class TimelineSchemaCreator {
     return commandLine;
     return commandLine;
   }
   }
 
 
+  @VisibleForTesting
   public static void createAllTables(Configuration hbaseConf,
   public static void createAllTables(Configuration hbaseConf,
       boolean skipExisting) throws IOException {
       boolean skipExisting) throws IOException {
 
 

+ 39 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java

@@ -187,8 +187,8 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     Client client = createClient();
     try {
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entity/cluster1/app1/app/id_1?userId=user1&" +
-          "flowId=flow1&flowRunId=1");
+          "timeline/entity/cluster1/app1/app/id_1?userid=user1&" +
+          "flowid=flow1&flowrunid=1");
       ClientResponse resp = getResponse(client, uri);
       ClientResponse resp = getResponse(client, uri);
       TimelineEntity entity = resp.getEntity(TimelineEntity.class);
       TimelineEntity entity = resp.getEntity(TimelineEntity.class);
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -258,6 +258,32 @@ public class TestTimelineReaderWebServices {
     }
     }
   }
   }
 
 
+  @Test
+  public void testQueryWithoutCluster() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entity/app1/app/id_1");
+      ClientResponse resp = getResponse(client, uri);
+      TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("id_1", entity.getId());
+      assertEquals("app", entity.getType());
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/entities/app1/app");
+      resp = getResponse(client, uri);
+      Set<TimelineEntity> entities =
+          resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entities);
+      assertEquals(4, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
   @Test
   @Test
   public void testGetEntities() throws Exception {
   public void testGetEntities() throws Exception {
     Client client = createClient();
     Client client = createClient();
@@ -318,8 +344,8 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     Client client = createClient();
     try {
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entities/cluster1/app1/app?createdTimeStart=1425016502030&"
-          + "createdTimeEnd=1425016502060");
+          "timeline/entities/cluster1/app1/app?createdtimestart=1425016502030&"
+          + "createdtimeend=1425016502060");
       ClientResponse resp = getResponse(client, uri);
       ClientResponse resp = getResponse(client, uri);
       Set<TimelineEntity> entities =
       Set<TimelineEntity> entities =
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -330,7 +356,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_4")));
           entities.contains(newEntity("app", "id_4")));
 
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?createdTimeEnd=1425016502010");
+          "entities/cluster1/app1/app?createdtimeend=1425016502010");
       resp = getResponse(client, uri);
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -340,7 +366,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_4")));
           entities.contains(newEntity("app", "id_4")));
 
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?createdTimeStart=1425016502010");
+          "entities/cluster1/app1/app?createdtimestart=1425016502010");
       resp = getResponse(client, uri);
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -358,8 +384,8 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     Client client = createClient();
     try {
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entities/cluster1/app1/app?modifiedTimeStart=1425016502090"
-          + "&modifiedTimeEnd=1425016503020");
+          "timeline/entities/cluster1/app1/app?modifiedtimestart=1425016502090"
+          + "&modifiedtimeend=1425016503020");
       ClientResponse resp = getResponse(client, uri);
       ClientResponse resp = getResponse(client, uri);
       Set<TimelineEntity> entities =
       Set<TimelineEntity> entities =
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -372,7 +398,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_4")));
           entities.contains(newEntity("app", "id_4")));
 
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?modifiedTimeEnd=1425016502090");
+          "entities/cluster1/app1/app?modifiedtimeend=1425016502090");
       resp = getResponse(client, uri);
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -384,7 +410,7 @@ public class TestTimelineReaderWebServices {
           entities.contains(newEntity("app", "id_3")));
           entities.contains(newEntity("app", "id_3")));
 
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entities/cluster1/app1/app?modifiedTimeStart=1425016503005");
+          "entities/cluster1/app1/app?modifiedtimestart=1425016503005");
       resp = getResponse(client, uri);
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
       assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
@@ -527,7 +553,7 @@ public class TestTimelineReaderWebServices {
           "timeline/entities/cluster1/app1/app?metricfilters=metric7&" +
           "timeline/entities/cluster1/app1/app?metricfilters=metric7&" +
           "isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1%60&relatesto=" +
           "isrelatedto=type1:tid1_1;tid1_2,type2:tid2_1%60&relatesto=" +
           "flow:flow1&eventfilters=event_2,event_4&infofilters=info2:3.5" +
           "flow:flow1&eventfilters=event_2,event_4&infofilters=info2:3.5" +
-          "&createdTimeStart=1425016502030&createdTimeEnd=1425016502060");
+          "&createdtimestart=1425016502030&createdtimeend=1425016502060");
       ClientResponse resp = getResponse(client, uri);
       ClientResponse resp = getResponse(client, uri);
       Set<TimelineEntity> entities =
       Set<TimelineEntity> entities =
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
           resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
@@ -544,11 +570,11 @@ public class TestTimelineReaderWebServices {
     Client client = createClient();
     Client client = createClient();
     try {
     try {
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
       URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/entities/cluster1/app1/app?flowRunId=a23b");
+          "timeline/entities/cluster1/app1/app?flowrunid=a23b");
       verifyHttpResponse(client, uri, Status.BAD_REQUEST);
       verifyHttpResponse(client, uri, Status.BAD_REQUEST);
 
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
-          "entity/cluster1/app1/app/id_1?flowRunId=2ab15");
+          "entity/cluster1/app1/app/id_1?flowrunid=2ab15");
       verifyHttpResponse(client, uri, Status.BAD_REQUEST);
       verifyHttpResponse(client, uri, Status.BAD_REQUEST);
 
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +

+ 365 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java

@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+public class TestTimelineReaderWebServicesFlowRun {
+  private int serverPort;
+  private TimelineReaderServer server;
+  private static HBaseTestingUtility util;
+  private static long ts = System.currentTimeMillis();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+    loadData();
+  }
+
+  private static void loadData() throws Exception {
+    String cluster = "cluster1";
+    String user = "user1";
+    String flow = "flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+    Long runid1 = 1002345678920L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 2);
+    metricValues.put(ts - 80000, 40);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    m1 = new TimelineMetric();
+    m1.setId("HDFS_BYTES_READ");
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 31);
+    metricValues.put(ts - 80000, 57);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    te.addEntity(entity);
+
+    // write another application with same metric to this flow
+    TimelineEntities te1 = new TimelineEntities();
+    TimelineEntity entity1 = new TimelineEntity();
+    id = "flowRunMetrics_test";
+    type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity1.setId(id);
+    entity1.setType(type);
+    cTime = 1425016501000L;
+    entity1.setCreatedTime(cTime);
+    // add metrics
+    metrics.clear();
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId("MAP_SLOT_MILLIS");
+    metricValues = new HashMap<Long, Number>();
+    metricValues.put(ts - 100000, 5L);
+    metricValues.put(ts - 80000, 101L);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+    entity1.addMetrics(metrics);
+    te1.addEntity(entity1);
+
+    String flow2 = "flow_name2";
+    String flowVersion2 = "CF7022C10F1454";
+    Long runid2 = 2102356789046L;
+    TimelineEntities te3 = new TimelineEntities();
+    TimelineEntity entity3 = new TimelineEntity();
+    id = "flowRunMetrics_test1";
+    entity3.setId(id);
+    entity3.setType(type);
+    cTime = 1425016501030L;
+    entity3.setCreatedTime(cTime);
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event2.setTimestamp(1436512802030L);
+    event2.addInfo("foo_event", "test");
+    entity3.addEvent(event2);
+    te3.addEntity(entity3);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+      hbi.write(cluster, user, flow, flowVersion, runid1, appName, te);
+      appName = "application_11111111111111_2223";
+      hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Before
+  public void init() throws Exception {
+    try {
+      Configuration config = util.getConfiguration();
+      config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+          "localhost:0");
+      config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+      config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+          "org.apache.hadoop.yarn.server.timelineservice.storage." +
+              "HBaseTimelineReaderImpl");
+      config.setInt("hfile.format.version", 3);
+      server = new TimelineReaderServer();
+      server.init(config);
+      server.start();
+      serverPort = server.getWebServerPort();
+    } catch (Exception e) {
+      Assert.fail("Web server failed to start");
+    }
+  }
+
+  private static Client createClient() {
+    ClientConfig cfg = new DefaultClientConfig();
+    cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+    return new Client(new URLConnectionClientHandler(
+        new DummyURLConnectionFactory()), cfg);
+  }
+
+  private static ClientResponse getResponse(Client client, URI uri)
+      throws Exception {
+    ClientResponse resp =
+        client.resource(uri).accept(MediaType.APPLICATION_JSON)
+        .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    if (resp == null ||
+        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+      String msg = new String();
+      if (resp != null) {
+        msg = resp.getClientResponseStatus().toString();
+      }
+      throw new IOException("Incorrect response from timeline reader. " +
+          "Status=" + msg);
+    }
+    return resp;
+  }
+
+  private static class DummyURLConnectionFactory
+      implements HttpURLConnectionFactory {
+
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+      try {
+        return (HttpURLConnection)url.openConnection();
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      }
+    }
+  }
+
+  private static TimelineMetric newMetric(String id, long ts, Number value) {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setId(id);
+    metric.addValue(ts, value);
+    return metric;
+  }
+
+  private static boolean verifyMetricValues(Map<Long, Number> m1,
+      Map<Long, Number> m2) {
+    for (Map.Entry<Long, Number> entry : m1.entrySet()) {
+      if (!m2.containsKey(entry.getKey())) {
+        return false;
+      }
+      if (m2.get(entry.getKey()).equals(entry.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static boolean verifyMetrics(
+      TimelineMetric m, TimelineMetric... metrics) {
+    for (TimelineMetric metric : metrics) {
+      if (!metric.equals(m)) {
+        continue;
+      }
+      if (!verifyMetricValues(metric.getValues(), m.getValues())) {
+        continue;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Test
+  public void testGetFlowRun() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
+      ClientResponse resp = getResponse(client, uri);
+      FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+      assertNotNull(entity);
+      assertEquals("user1@flow_name/1002345678919", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
+      TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flowrun/flow_name/1002345678919?userid=user1");
+      resp = getResponse(client, uri);
+      entity = resp.getEntity(FlowRunEntity.class);
+      assertNotNull(entity);
+      assertEquals("user1@flow_name/1002345678919", entity.getId());
+      assertEquals(2, entity.getMetrics().size());
+      m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
+      m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
+      for (TimelineMetric metric : entity.getMetrics()) {
+        assertTrue(verifyMetrics(metric, m1, m2));
+      }
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @Test
+  public void testGetFlows() throws Exception {
+    Client client = createClient();
+    try {
+      URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flows/cluster1");
+      ClientResponse resp = getResponse(client, uri);
+      Set<FlowActivityEntity> entities =
+          resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+      for (FlowActivityEntity entity : entities) {
+        assertTrue((entity.getId().endsWith("@flow_name") &&
+            entity.getFlowRuns().size() == 2) ||
+            (entity.getId().endsWith("@flow_name2") &&
+            entity.getFlowRuns().size() == 1));
+      }
+
+      // Query without specifying cluster ID.
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+          "timeline/flows/");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(2, entities.size());
+
+      uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+              "timeline/flows/cluster1?limit=1");
+      resp = getResponse(client, uri);
+      entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+      assertNotNull(entities);
+      assertEquals(1, entities.size());
+    } finally {
+      client.destroy();
+    }
+  }
+
+  @After
+  public void stop() throws Exception {
+    if (server != null) {
+      server.stop();
+      server = null;
+    }
+  }
+
+}

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/resources/log4j.properties

@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n