Sfoglia il codice sorgente

YARN-3051. Created storage oriented reader interface for fetching raw entity data and made the filesystem based implementation. Contributed by Varun Saxena.

(cherry picked from commit 499ce52c7b645ec0b1cc8ac62dc9a3127b987a20)
Zhijie Shen 10 anni fa
parent
commit
2d59bc4458

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.timelineservice;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.codehaus.jackson.annotate.JsonSetter;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -335,6 +336,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("isrelatedto")
   public void setIsRelatedToEntities(
       Map<String, Set<String>> isRelatedToEntities) {
     if (real == null) {
@@ -423,6 +425,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("relatesto")
   public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
     if (real == null) {
       this.relatesToEntities =
@@ -441,6 +444,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("createdtime")
   public void setCreatedTime(long createdTime) {
     if (real == null) {
       this.createdTime = createdTime;
@@ -458,6 +462,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("modifiedtime")
   public void setModifiedTime(long modifiedTime) {
     if (real == null) {
       this.modifiedTime = modifiedTime;

+ 490 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java

@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ *  File System based implementation for TimelineReader.
+ */
+public class FileSystemTimelineReaderImpl extends AbstractService
+    implements TimelineReader {
+
+  private static final Log LOG =
+      LogFactory.getLog(FileSystemTimelineReaderImpl.class);
+
+  private String rootPath;
+  private static final String ENTITIES_DIR = "entities";
+
+  /** Default extension for output files. */
+  private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+
+  @VisibleForTesting
+  /** Default extension for output files. */
+  static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
+
+  @VisibleForTesting
+  /** Config param for timeline service file system storage root. */
+  static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+  @VisibleForTesting
+  /** Default value for storage location on local disk. */
+  static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+      "/tmp/timeline_service_data";
+
+  private final CSVFormat csvFormat =
+      CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+
+  public FileSystemTimelineReaderImpl() {
+    super(FileSystemTimelineReaderImpl.class.getName());
+  }
+
+  @VisibleForTesting
+  String getRootPath() {
+    return rootPath;
+  }
+
+  private static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+  }
+
+  /**
+   * Deserialize a POJO object from a JSON string.
+   * @param clazz
+   *      class to be desirialized
+   *
+   * @param jsonString
+   *    json string to deserialize
+   * @return TimelineEntity object
+   * @throws IOException
+   * @throws JsonMappingException
+   * @throws JsonGenerationException
+   */
+  public static <T> T getTimelineRecordFromJSON(
+      String jsonString, Class<T> clazz)
+      throws JsonGenerationException, JsonMappingException, IOException {
+    return mapper.readValue(jsonString, clazz);
+  }
+
+  private static void fillFields(TimelineEntity finalEntity,
+      TimelineEntity real, EnumSet<Field> fields) {
+    if (fields.contains(Field.ALL)) {
+      finalEntity.setConfigs(real.getConfigs());
+      finalEntity.setMetrics(real.getMetrics());
+      finalEntity.setInfo(real.getInfo());
+      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+      finalEntity.setEvents(real.getEvents());
+      return;
+    }
+    for (Field field : fields) {
+      switch(field) {
+      case CONFIGS:
+        finalEntity.setConfigs(real.getConfigs());
+        break;
+      case METRICS:
+        finalEntity.setMetrics(real.getMetrics());
+        break;
+      case INFO:
+        finalEntity.setInfo(real.getInfo());
+        break;
+      case IS_RELATED_TO:
+        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+        break;
+      case RELATES_TO:
+        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+        break;
+      case EVENTS:
+        finalEntity.setEvents(real.getEvents());
+        break;
+      default:
+        continue;
+      }
+    }
+  }
+
+  private static boolean matchFilter(Object infoValue, Object filterValue) {
+    return infoValue.equals(filterValue);
+  }
+
+  private static boolean matchFilters(Map<String, ? extends Object> entityInfo,
+      Map<String, ? extends Object> filters) {
+    if (entityInfo == null || entityInfo.isEmpty()) {
+      return false;
+    }
+    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+      Object infoValue = entityInfo.get(filter.getKey());
+      if (infoValue == null) {
+        return false;
+      }
+      if (!matchFilter(infoValue, filter.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private String getFlowRunPath(String userId, String clusterId, String flowId,
+      Long flowRunId, String appId)
+      throws IOException {
+    if (userId != null && flowId != null && flowRunId != null) {
+      return userId + "/" + flowId + "/" + flowRunId;
+    }
+    if (clusterId == null || appId == null) {
+      throw new IOException("Unable to get flow info");
+    }
+    String appFlowMappingFile = rootPath + "/" +  ENTITIES_DIR + "/" +
+        clusterId + "/" + APP_FLOW_MAPPING_FILE;
+    try (BufferedReader reader =
+        new BufferedReader(new InputStreamReader(
+            new FileInputStream(
+                appFlowMappingFile), Charset.forName("UTF-8")));
+        CSVParser parser = new CSVParser(reader, csvFormat)) {
+      for (CSVRecord record : parser.getRecords()) {
+        if (record.size() < 4) {
+          continue;
+        }
+        String applicationId = record.get("APP");
+        if (applicationId != null && !applicationId.trim().isEmpty() &&
+            !applicationId.trim().equals(appId)) {
+          continue;
+        }
+        return record.get(1).trim() + "/" + record.get(2).trim() + "/" +
+            record.get(3).trim();
+      }
+      parser.close();
+    }
+    throw new IOException("Unable to get flow info");
+  }
+
+  private static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+      Set<String> metricFilters) {
+    Set<String> tempMetrics = new HashSet<String>();
+    for (TimelineMetric metric : metrics) {
+      tempMetrics.add(metric.getId());
+    }
+
+    for (String metricFilter : metricFilters) {
+      if (!tempMetrics.contains(metricFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+      Set<String> eventFilters) {
+    Set<String> tempEvents = new HashSet<String>();
+    for (TimelineEvent event : entityEvents) {
+      tempEvents.add(event.getId());
+    }
+
+    for (String eventFilter : eventFilters) {
+      if (!tempEvents.contains(eventFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
+      EnumSet<Field> fieldsToRetrieve) {
+    TimelineEntity entityToBeReturned = new TimelineEntity();
+    entityToBeReturned.setIdentifier(entity.getIdentifier());
+    entityToBeReturned.setCreatedTime(entity.getCreatedTime());
+    entityToBeReturned.setModifiedTime(entity.getModifiedTime());
+    if (fieldsToRetrieve != null) {
+      fillFields(entityToBeReturned, entity, fieldsToRetrieve);
+    }
+    return entityToBeReturned;
+  }
+
+  private static boolean isTimeInRange(Long time, Long timeBegin,
+      Long timeEnd) {
+    return (time >= timeBegin) && (time <= timeEnd);
+  }
+
+  private static boolean matchRelations(
+      Map<String, Set<String>> entityRelations,
+      Map<String, Set<String>> relations) {
+    for (Map.Entry<String, Set<String>> relation : relations.entrySet()) {
+      Set<String> ids = entityRelations.get(relation.getKey());
+      if (ids == null) {
+        return false;
+      }
+      for (String id : relation.getValue()) {
+        if (!ids.contains(id)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private static void mergeEntities(TimelineEntity entity1,
+      TimelineEntity entity2) {
+    // Ideally created time wont change except in the case of issue from client.
+    if (entity2.getCreatedTime() > 0) {
+      entity1.setCreatedTime(entity2.getCreatedTime());
+    }
+    if (entity2.getModifiedTime() > 0) {
+      entity1.setModifiedTime(entity2.getModifiedTime());
+    }
+    for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) {
+      entity1.addConfig(configEntry.getKey(), configEntry.getValue());
+    }
+    for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) {
+      entity1.addInfo(infoEntry.getKey(), infoEntry.getValue());
+    }
+    for (Entry<String, Set<String>> isRelatedToEntry :
+        entity2.getIsRelatedToEntities().entrySet()) {
+      String type = isRelatedToEntry.getKey();
+      for (String entityId : isRelatedToEntry.getValue()) {
+        entity1.addIsRelatedToEntity(type, entityId);
+      }
+    }
+    for (Entry<String, Set<String>> relatesToEntry :
+        entity2.getRelatesToEntities().entrySet()) {
+      String type = relatesToEntry.getKey();
+      for (String entityId : relatesToEntry.getValue()) {
+        entity1.addRelatesToEntity(type, entityId);
+      }
+    }
+    for (TimelineEvent event : entity2.getEvents()) {
+      entity1.addEvent(event);
+    }
+    for (TimelineMetric metric2 : entity2.getMetrics()) {
+      boolean found = false;
+      for (TimelineMetric metric1 : entity1.getMetrics()) {
+        if (metric1.getId().equals(metric2.getId())) {
+          metric1.addValues(metric2.getValues());
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        entity1.addMetric(metric2);
+      }
+    }
+  }
+
+  private static TimelineEntity readEntityFromFile(BufferedReader reader)
+      throws IOException {
+    TimelineEntity entity =
+        getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class);
+    String entityStr = "";
+    while ((entityStr = reader.readLine()) != null) {
+      if (entityStr.trim().isEmpty()) {
+        continue;
+      }
+      TimelineEntity anotherEntity =
+          getTimelineRecordFromJSON(entityStr, TimelineEntity.class);
+      if (!entity.getId().equals(anotherEntity.getId()) ||
+          !entity.getType().equals(anotherEntity.getType())) {
+        continue;
+      }
+      mergeEntities(entity, anotherEntity);
+    }
+    return entity;
+  }
+
+  private Set<TimelineEntity> getEntities(File dir, String entityType,
+      Long limit, Long createdTimeBegin,
+      Long createdTimeEnd, Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    if (limit == null || limit <= 0) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (createdTimeBegin == null || createdTimeBegin <= 0) {
+      createdTimeBegin = 0L;
+    }
+    if (createdTimeEnd == null || createdTimeEnd <= 0) {
+      createdTimeEnd = Long.MAX_VALUE;
+    }
+    if (modifiedTimeBegin == null || modifiedTimeBegin <= 0) {
+      modifiedTimeBegin = 0L;
+    }
+    if (modifiedTimeEnd == null || modifiedTimeEnd <= 0) {
+      modifiedTimeEnd = Long.MAX_VALUE;
+    }
+
+    // First sort the selected entities based on created/start time.
+    Map<Long, Set<TimelineEntity>> sortedEntities =
+        new TreeMap<>(
+          new Comparator<Long>() {
+            @Override
+            public int compare(Long l1, Long l2) {
+              return l2.compareTo(l1);
+            }
+          }
+        );
+    for (File entityFile : dir.listFiles()) {
+      if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
+        continue;
+      }
+      try (BufferedReader reader =
+          new BufferedReader(
+              new InputStreamReader(
+                  new FileInputStream(
+                      entityFile), Charset.forName("UTF-8")))) {
+        TimelineEntity entity = readEntityFromFile(reader);
+        if (!entity.getType().equals(entityType)) {
+          continue;
+        }
+        if (!isTimeInRange(entity.getCreatedTime(), createdTimeBegin,
+            createdTimeEnd)) {
+          continue;
+        }
+        if (!isTimeInRange(entity.getModifiedTime(), modifiedTimeBegin,
+            modifiedTimeEnd)) {
+          continue;
+        }
+        if (relatesTo != null && !relatesTo.isEmpty() &&
+            !matchRelations(entity.getRelatesToEntities(), relatesTo)) {
+          continue;
+        }
+        if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
+            !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
+          continue;
+        }
+        if (infoFilters != null && !infoFilters.isEmpty() &&
+            !matchFilters(entity.getInfo(), infoFilters)) {
+          continue;
+        }
+        if (configFilters != null && !configFilters.isEmpty() &&
+            !matchFilters(entity.getConfigs(), configFilters)) {
+          continue;
+        }
+        if (metricFilters != null && !metricFilters.isEmpty() &&
+            !matchMetricFilters(entity.getMetrics(), metricFilters)) {
+          continue;
+        }
+        if (eventFilters != null && !eventFilters.isEmpty() &&
+            !matchEventFilters(entity.getEvents(), eventFilters)) {
+          continue;
+        }
+        TimelineEntity entityToBeReturned =
+            createEntityToBeReturned(entity, fieldsToRetrieve);
+        Set<TimelineEntity> entitiesCreatedAtSameTime =
+            sortedEntities.get(entityToBeReturned.getCreatedTime());
+        if (entitiesCreatedAtSameTime == null) {
+          entitiesCreatedAtSameTime = new HashSet<TimelineEntity>();
+        }
+        entitiesCreatedAtSameTime.add(entityToBeReturned);
+        sortedEntities.put(
+            entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime);
+      }
+    }
+
+    Set<TimelineEntity> entities = new HashSet<TimelineEntity>();
+    long entitiesAdded = 0;
+    for (Set<TimelineEntity> entitySet : sortedEntities.values()) {
+      for (TimelineEntity entity : entitySet) {
+        entities.add(entity);
+        ++entitiesAdded;
+        if (entitiesAdded >= limit) {
+          return entities;
+        }
+      }
+    }
+    return entities;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public TimelineEntity getEntity(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException {
+    String flowRunPath = getFlowRunPath(userId, clusterId, flowId,
+        flowRunId, appId);
+    File dir = new File(new File(rootPath, ENTITIES_DIR),
+        clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
+    File entityFile =
+        new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
+    try (BufferedReader reader =
+        new BufferedReader(new InputStreamReader(
+            new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+      TimelineEntity entity = readEntityFromFile(reader);
+      return createEntityToBeReturned(entity, fieldsToRetrieve);
+    }
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    String flowRunPath =
+        getFlowRunPath(userId, clusterId, flowId, flowRunId, appId);
+    File dir =
+        new File(new File(rootPath, ENTITIES_DIR),
+            clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
+    return getEntities(dir, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+}

+ 162 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+
+/** ATSv2 reader interface. */
+@Private
+@Unstable
+public interface TimelineReader extends Service {
+
+  /**
+   * Default limit for {@link #getEntities}.
+   */
+  long DEFAULT_LIMIT = 100;
+
+  /**
+   * Possible fields to retrieve for {@link #getEntities} and
+   * {@link #getEntity}.
+   */
+  public enum Field {
+    ALL,
+    EVENTS,
+    INFO,
+    METRICS,
+    CONFIGS,
+    RELATES_TO,
+    IS_RELATED_TO
+  }
+
+  /**
+   * <p>The API to fetch the single entity given the entity identifier in the
+   * scope of the given context.</p>
+   *
+   * @param userId
+   *    Context user Id(optional).
+   * @param clusterId
+   *    Context cluster Id(mandatory).
+   * @param flowId
+   *    Context flow Id (optional).
+   * @param flowRunId
+   *    Context flow run Id (optional).
+   * @param appId
+   *    Context app Id (mandatory)
+   * @param entityType
+   *    Entity type (mandatory)
+   * @param entityId
+   *    Entity Id (mandatory)
+   * @param fieldsToRetrieve
+   *    Specifies which fields of the entity object to retrieve(optional), see
+   *    {@link Field}. If null, retrieves 4 fields namely entity id,
+   *    entity type, entity created time and entity modified time. All
+   *    entities will be returned if {@link Field#ALL} is specified.
+   * @return a {@link TimelineEntity} instance or null. The entity will
+   *    contain the metadata plus the given fields to retrieve.
+   * @throws IOException
+   */
+  TimelineEntity getEntity(String userId, String clusterId, String flowId,
+      Long flowRunId, String appId, String entityType, String entityId,
+      EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+  /**
+   * <p>The API to search for a set of entities of the given the entity type in
+   * the scope of the given context which matches the given predicates. The
+   * predicates include the created/modified time window, limit to number of
+   * entities to be returned, and the entities can be filtered by checking
+   * whether they contain the given info/configs entries in the form of
+   * key/value pairs, given metrics in the form of metricsIds and its relation
+   * with metric values given events in the form of the Ids, and whether they
+   * relate to/are related to other entities. For those parameters which have
+   * multiple entries, the qualified entity needs to meet all or them.</p>
+   *
+   * @param userId
+   *    Context user Id(optional).
+   * @param clusterId
+   *    Context cluster Id(mandatory).
+   * @param flowId
+   *    Context flow Id (optional).
+   * @param flowRunId
+   *    Context flow run Id (optional).
+   * @param appId
+   *    Context app Id (mandatory)
+   * @param entityType
+   *    Entity type (mandatory)
+   * @param limit
+   *    A limit on the number of entities to return (optional). If null or <=0,
+   *    defaults to {@link #DEFAULT_LIMIT}.
+   * @param createdTimeBegin
+   *    Matched entities should not be created before this timestamp (optional).
+   *    If null or <=0, defaults to 0.
+   * @param createdTimeEnd
+   *    Matched entities should not be created after this timestamp (optional).
+   *    If null or <=0, defaults to {@link Long#MAX_VALUE}.
+   * @param modifiedTimeBegin
+   *    Matched entities should not be modified before this timestamp
+   *    (optional). If null or <=0, defaults to 0.
+   * @param modifiedTimeEnd
+   *    Matched entities should not be modified after this timestamp (optional).
+   *    If null or <=0, defaults to {@link Long#MAX_VALUE}.
+   * @param relatesTo
+   *    Matched entities should relate to given entities (optional).
+   * @param isRelatedTo
+   *    Matched entities should be related to given entities (optional).
+   * @param infoFilters
+   *    Matched entities should have exact matches to the given info represented
+   *    as key-value pairs (optional). If null or empty, the filter is not
+   *    applied.
+   * @param configFilters
+   *    Matched entities should have exact matches to the given configs
+   *    represented as key-value pairs (optional). If null or empty, the filter
+   *    is not applied.
+   * @param metricFilters
+   *    Matched entities should contain the given metrics (optional). If null
+   *    or empty, the filter is not applied.
+   * @param eventFilters
+   *    Matched entities should contain the given events (optional). If null
+   *    or empty, the filter is not applied.
+   * @param fieldsToRetrieve
+   *    Specifies which fields of the entity object to retrieve(optional), see
+   *    {@link Field}. If null, retrieves 4 fields namely entity id,
+   *    entity type, entity created time and entity modified time. All
+   *    entities will be returned if {@link Field#ALL} is specified.
+   * @return A set of {@link TimelineEntity} instances of the given entity type
+   *    in the given context scope which matches the given predicates
+   *    ordered by created time, descending. Each entity will only contain the
+   *    metadata(id, type, created and modified times) plus the given fields to
+   *    retrieve.
+   * @throws IOException
+   */
+  Set<TimelineEntity> getEntities(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String>  metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException;
+}

+ 556 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java

@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileSystemTimelineReaderImpl {
+
+  private static final String rootDir =
+      FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+  FileSystemTimelineReaderImpl reader;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    loadEntityData();
+    // Create app flow mapping file.
+    CSVFormat format =
+        CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+    String appFlowMappingFile = rootDir + "/entities/cluster1/" +
+        FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(
+            new FileWriter(appFlowMappingFile, true)));
+        CSVPrinter printer = new CSVPrinter(out, format)){
+      printer.printRecord("app1", "user1", "flow1", 1);
+      printer.printRecord("app2","user1","flow1,flow",1);
+      printer.close();
+    }
+    (new File(rootDir)).deleteOnExit();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    FileUtils.deleteDirectory(new File(rootDir));
+  }
+
+  @Before
+  public void init() throws Exception {
+    reader = new FileSystemTimelineReaderImpl();
+    Configuration conf = new YarnConfiguration();
+    conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        rootDir);
+    reader.init(conf);
+  }
+
+  private static void writeEntityFile(TimelineEntity entity, File dir)
+      throws Exception {
+    if (!dir.exists()) {
+      if (!dir.mkdirs()) {
+        throw new IOException("Could not create directories for " + dir);
+      }
+    }
+    String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
+      out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      out.write("\n");
+      out.close();
+    }
+  }
+
+  private static void loadEntityData() throws Exception {
+    File appDir = new File(rootDir +
+        "/entities/cluster1/user1/flow1/1/app1/app/");
+    TimelineEntity entity11 = new TimelineEntity();
+    entity11.setId("id_1");
+    entity11.setType("app");
+    entity11.setCreatedTime(1425016502000L);
+    entity11.setModifiedTime(1425016502050L);
+    Map<String, Object> info1 = new HashMap<String, Object>();
+    info1.put("info1", "val1");
+    entity11.addInfo(info1);
+    TimelineEvent event = new TimelineEvent();
+    event.setId("event_1");
+    event.setTimestamp(1425016502003L);
+    entity11.addEvent(event);
+    Set<TimelineMetric> metrics = new HashSet<TimelineMetric>();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setId("metric1");
+    metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric1.addValue(1425016502006L, 113.2F);
+    metrics.add(metric1);
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setId("metric2");
+    metric2.setType(TimelineMetric.Type.TIME_SERIES);
+    metric2.addValue(1425016502016L, 34);
+    metrics.add(metric2);
+    entity11.setMetrics(metrics);
+    Map<String,String> configs = new HashMap<String, String>();
+    configs.put("config_1", "123");
+    entity11.setConfigs(configs);
+    entity11.addRelatesToEntity("flow", "flow1");
+    entity11.addIsRelatedToEntity("type1", "tid1_1");
+    writeEntityFile(entity11, appDir);
+    TimelineEntity entity12 = new TimelineEntity();
+    entity12.setId("id_1");
+    entity12.setType("app");
+    entity12.setModifiedTime(1425016503000L);
+    configs.clear();
+    configs.put("config_2", "23");
+    configs.put("config_3", "abc");
+    entity12.addConfigs(configs);
+    metrics.clear();
+    TimelineMetric metric12 = new TimelineMetric();
+    metric12.setId("metric2");
+    metric12.setType(TimelineMetric.Type.TIME_SERIES);
+    metric12.addValue(1425016502032L, 48);
+    metric12.addValue(1425016502054L, 51);
+    metrics.add(metric12);
+    TimelineMetric metric3 = new TimelineMetric();
+    metric3.setId("metric3");
+    metric3.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric3.addValue(1425016502060L, 23L);
+    metrics.add(metric3);
+    entity12.setMetrics(metrics);
+    entity12.addIsRelatedToEntity("type1", "tid1_2");
+    entity12.addIsRelatedToEntity("type2", "tid2_1`");
+    TimelineEvent event15 = new TimelineEvent();
+    event15.setId("event_5");
+    event15.setTimestamp(1425016502017L);
+    entity12.addEvent(event15);
+    writeEntityFile(entity12, appDir);
+
+    TimelineEntity entity2 = new TimelineEntity();
+    entity2.setId("id_2");
+    entity2.setType("app");
+    entity2.setCreatedTime(1425016501050L);
+    entity2.setModifiedTime(1425016502010L);
+    Map<String, Object> info2 = new HashMap<String, Object>();
+    info1.put("info2", 4);
+    entity2.addInfo(info2);
+    Map<String,String> configs2 = new HashMap<String, String>();
+    configs2.put("config_1", "123");
+    configs2.put("config_3", "def");
+    entity2.setConfigs(configs2);
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId("event_2");
+    event2.setTimestamp(1425016501003L);
+    entity2.addEvent(event2);
+    Set<TimelineMetric> metrics2 = new HashSet<TimelineMetric>();
+    TimelineMetric metric21 = new TimelineMetric();
+    metric21.setId("metric1");
+    metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric21.addValue(1425016501006L, 123.2F);
+    metrics2.add(metric21);
+    TimelineMetric metric22 = new TimelineMetric();
+    metric22.setId("metric2");
+    metric22.setType(TimelineMetric.Type.TIME_SERIES);
+    metric22.addValue(1425016501056L, 31);
+    metric22.addValue(1425016501084L, 70);
+    metrics2.add(metric22);
+    TimelineMetric metric23 = new TimelineMetric();
+    metric23.setId("metric3");
+    metric23.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric23.addValue(1425016502060L, 23L);
+    metrics2.add(metric23);
+    entity2.setMetrics(metrics2);
+    entity2.addRelatesToEntity("flow", "flow2");
+    writeEntityFile(entity2, appDir);
+
+    TimelineEntity entity3 = new TimelineEntity();
+    entity3.setId("id_3");
+    entity3.setType("app");
+    entity3.setCreatedTime(1425016501050L);
+    entity3.setModifiedTime(1425016502010L);
+    Map<String, Object> info3 = new HashMap<String, Object>();
+    info3.put("info2", 3.5);
+    entity3.addInfo(info3);
+    Map<String,String> configs3 = new HashMap<String, String>();
+    configs3.put("config_1", "123");
+    configs3.put("config_3", "abc");
+    entity3.setConfigs(configs3);
+    TimelineEvent event3 = new TimelineEvent();
+    event3.setId("event_2");
+    event3.setTimestamp(1425016501003L);
+    entity3.addEvent(event3);
+    TimelineEvent event4 = new TimelineEvent();
+    event4.setId("event_4");
+    event4.setTimestamp(1425016502006L);
+    entity3.addEvent(event4);
+    Set<TimelineMetric> metrics3 = new HashSet<TimelineMetric>();
+    TimelineMetric metric31 = new TimelineMetric();
+    metric31.setId("metric1");
+    metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric31.addValue(1425016501006L, 124.8F);
+    metrics3.add(metric31);
+    TimelineMetric metric32 = new TimelineMetric();
+    metric32.setId("metric2");
+    metric32.setType(TimelineMetric.Type.TIME_SERIES);
+    metric32.addValue(1425016501056L, 31);
+    metric32.addValue(1425016501084L, 74);
+    metrics3.add(metric32);
+    entity3.setMetrics(metrics3);
+    entity3.addIsRelatedToEntity("type1", "tid1_2");
+    writeEntityFile(entity3, appDir);
+
+    TimelineEntity entity4 = new TimelineEntity();
+    entity4.setId("id_4");
+    entity4.setType("app");
+    entity4.setCreatedTime(1425016502050L);
+    entity4.setModifiedTime(1425016503010L);
+    TimelineEvent event44 = new TimelineEvent();
+    event44.setId("event_4");
+    event44.setTimestamp(1425016502003L);
+    entity4.addEvent(event44);
+    writeEntityFile(entity4, appDir);
+
+    File appDir2 = new File(rootDir +
+            "/entities/cluster1/user1/flow1,flow/1/app2/app/");
+    TimelineEntity entity5 = new TimelineEntity();
+    entity5.setId("id_5");
+    entity5.setType("app");
+    entity5.setCreatedTime(1425016502050L);
+    entity5.setModifiedTime(1425016503010L);
+    writeEntityFile(entity5, appDir2);
+  }
+
+  public TimelineReader getTimelineReader() {
+    return reader;
+  }
+
+  @Test
+  public void testGetEntityDefaultView() throws Exception {
+    // If no fields are specified, entity is returned with default view i.e.
+    // only the id, created and modified time
+    TimelineEntity result =
+        reader.getEntity("user1", "cluster1", "flow1", 1L, "app1",
+            "app", "id_1", null);
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+
+  @Test
+  public void testGetEntityByClusterAndApp() throws Exception {
+    // Cluster and AppId should be enough to get an entity.
+    TimelineEntity result =
+        reader.getEntity(null, "cluster1", null, null, "app1",
+            "app", "id_1", null);
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+
+  /** This test checks whether we can handle commas in app flow mapping csv */
+  @Test
+  public void testAppFlowMappingCsv() throws Exception {
+    // Test getting an entity by cluster and app where flow entry
+    // in app flow mapping csv has commas.
+    TimelineEntity result =
+        reader.getEntity(null, "cluster1", null, null, "app2",
+            "app", "id_5", null);
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_5")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502050L, result.getCreatedTime());
+    Assert.assertEquals(1425016503010L, result.getModifiedTime());
+  }
+
+  @Test
+  public void testGetEntityCustomFields() throws Exception {
+    // Specified fields in addition to default view will be returned.
+    TimelineEntity result =
+        reader.getEntity("user1", "cluster1", "flow1", 1L,
+            "app1", "app", "id_1",
+            EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    Assert.assertEquals(1, result.getInfo().size());
+    // No events will be returned
+    Assert.assertEquals(0, result.getEvents().size());
+  }
+
+  @Test
+  public void testGetEntityAllFields() throws Exception {
+    // All fields of TimelineEntity will be returned.
+    TimelineEntity result =
+        reader.getEntity("user1", "cluster1", "flow1", 1L,
+            "app1", "app", "id_1", EnumSet.of(Field.ALL));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    // All fields including events will be returned.
+    Assert.assertEquals(2, result.getEvents().size());
+  }
+
+  @Test
+  public void testGetAllEntities() throws Exception {
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, null, null,
+            null, null);
+    // All 3 entities will be returned
+    Assert.assertEquals(4, result.size());
+  }
+
+  @Test
+  public void testGetEntitiesWithLimit() throws Exception {
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            2L, null, null, null, null, null, null, null, null, null,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    // Needs to be rewritten once hashcode and equals for
+    // TimelineEntity is implemented
+    // Entities with id_1 and id_4 should be returned,
+    // based on created time, descending.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+        Assert.fail("Entity not sorted by created time");
+      }
+    }
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            3L, null, null, null, null, null, null, null, null, null,
+                null, null);
+     // Even though 2 entities out of 4 have same created time, one entity
+     // is left out due to limit
+     Assert.assertEquals(3, result.size());
+  }
+
+  @Test
+  public void testGetEntitiesByTimeWindows() throws Exception {
+    // Get entities based on created time start and end time range.
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, 1425016502030L, 1425016502060L, null, null, null, null, null,
+            null, null, null, null);
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_4 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities if only created time end is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, 1425016502010L, null, null, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(3, result.size());
+    for (TimelineEntity entity : result) {
+      if (entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities if only created time start is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, 1425016502010L, null, null, null, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities based on modified time start and end time range.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, 1425016502090L, 1425016503020L, null, null, null,
+            null, null, null, null);
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_4 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on modified time range");
+      }
+    }
+
+    // Get entities if only modified time end is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, 1425016502090L, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on modified time range");
+      }
+    }
+
+    // Get entities if only modified time start is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, 1425016503005L, null, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on modified time range");
+      }
+    }
+  }
+
+  @Test
+  public void testGetFilteredEntities() throws Exception {
+    // Get entities based on info filters.
+    Map<String, Object> infoFilters = new HashMap<String, Object>();
+    infoFilters.put("info2", 3.5);
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, infoFilters, null, null,
+            null, null);
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    // Get entities based on config filters.
+    Map<String, String> configFilters = new HashMap<String, String>();
+    configFilters.put("config_1", "123");
+    configFilters.put("config_3", "abc");
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, configFilters, null,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    // Get entities based on event filters.
+    Set<String> eventFilters = new HashSet<String>();
+    eventFilters.add("event_2");
+    eventFilters.add("event_4");
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, null, null,
+            eventFilters, null);
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on event filters");
+      }
+    }
+
+    // Get entities based on metric filters.
+    Set<String> metricFilters = new HashSet<String>();
+    metricFilters.add("metric3");
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, null, metricFilters,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_2 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByRelations() throws Exception {
+    // Get entities based on relatesTo.
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    Set<String> relatesToIds = new HashSet<String>();
+    relatesToIds.add("flow1");
+    relatesTo.put("flow", relatesToIds);
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, relatesTo, null, null, null, null,
+            null, null);
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_1 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+        Assert.fail("Incorrect filtering based on relatesTo");
+      }
+    }
+
+    // Get entities based on isRelatedTo.
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    Set<String> isRelatedToIds = new HashSet<String>();
+    isRelatedToIds.add("tid1_2");
+    isRelatedTo.put("type1", isRelatedToIds);
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, isRelatedTo, null, null, null,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on isRelatedTo");
+      }
+    }
+  }
+}