Просмотр исходного кода

YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee)

(cherry picked from commit 07433c2ad52df9e844dbd90020c277d3df844dcd)
Sangjin Lee 10 лет назад
Родитель
Сommit
0bed3fb3b3
22 измененных файлов с 1130 добавлено и 143 удалено
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 5 0
      hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  3. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
  4. 48 116
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
  5. 424 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
  6. 41 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
  7. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
  8. 126 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
  9. 51 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java
  10. 39 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
  11. 110 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
  12. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
  13. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
  14. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
  15. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
  16. 112 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
  17. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
  18. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
  19. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
  20. 32 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
  21. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
  22. 73 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3949. Ensure timely flush of timeline writes. (Sangjin Lee via
     junping_du)
 
+    YARN-3049. [Storage Implementation] Implement storage reader interface to
+    fetch raw data from HBase backend (Zhijie Shen via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -522,4 +522,9 @@
     <Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
     <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
   </Match>
+  <!-- Object cast is based on the event type -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
+     <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
 </FindBugsFilter>

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java

@@ -29,7 +29,9 @@ import javax.xml.bind.annotation.XmlRootElement;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * The basic timeline entity data structure for timeline service v2. Timeline
@@ -133,7 +135,8 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
   private HashMap<String, Object> info = new HashMap<>();
   private HashMap<String, String> configs = new HashMap<>();
   private Set<TimelineMetric> metrics = new HashSet<>();
-  private Set<TimelineEvent> events = new HashSet<>();
+  // events should be sorted by timestamp in descending order
+  private NavigableSet<TimelineEvent> events = new TreeSet<>();
   private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
   private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
   private long createdTime;
@@ -334,7 +337,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
   }
 
   @XmlElement(name = "events")
-  public Set<TimelineEvent> getEvents() {
+  public NavigableSet<TimelineEvent> getEvents() {
     if (real == null) {
       return events;
     } else {
@@ -342,7 +345,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
     }
   }
 
-  public void setEvents(Set<TimelineEvent> events) {
+  public void setEvents(NavigableSet<TimelineEvent> events) {
     if (real == null) {
       this.events = events;
     } else {

+ 48 - 116
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -119,59 +120,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService
   private static void fillFields(TimelineEntity finalEntity,
       TimelineEntity real, EnumSet<Field> fields) {
     if (fields.contains(Field.ALL)) {
-      finalEntity.setConfigs(real.getConfigs());
-      finalEntity.setMetrics(real.getMetrics());
-      finalEntity.setInfo(real.getInfo());
-      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-      finalEntity.setEvents(real.getEvents());
-      return;
+      fields = EnumSet.allOf(Field.class);
     }
     for (Field field : fields) {
       switch(field) {
-      case CONFIGS:
-        finalEntity.setConfigs(real.getConfigs());
-        break;
-      case METRICS:
-        finalEntity.setMetrics(real.getMetrics());
-        break;
-      case INFO:
-        finalEntity.setInfo(real.getInfo());
-        break;
-      case IS_RELATED_TO:
-        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-        break;
-      case RELATES_TO:
-        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
-        break;
-      case EVENTS:
-        finalEntity.setEvents(real.getEvents());
-        break;
-      default:
-        continue;
-      }
-    }
-  }
-
-  private static boolean matchFilter(Object infoValue, Object filterValue) {
-    return infoValue.equals(filterValue);
-  }
-
-  private static boolean matchFilters(Map<String, ? extends Object> entityInfo,
-      Map<String, ? extends Object> filters) {
-    if (entityInfo == null || entityInfo.isEmpty()) {
-      return false;
-    }
-    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
-      Object infoValue = entityInfo.get(filter.getKey());
-      if (infoValue == null) {
-        return false;
-      }
-      if (!matchFilter(infoValue, filter.getValue())) {
-        return false;
+        case CONFIGS:
+          finalEntity.setConfigs(real.getConfigs());
+          break;
+        case METRICS:
+          finalEntity.setMetrics(real.getMetrics());
+          break;
+        case INFO:
+          finalEntity.setInfo(real.getInfo());
+          break;
+        case IS_RELATED_TO:
+          finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+          break;
+        case RELATES_TO:
+          finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+          break;
+        case EVENTS:
+          finalEntity.setEvents(real.getEvents());
+          break;
+        default:
+          continue;
       }
     }
-    return true;
   }
 
   private String getFlowRunPath(String userId, String clusterId, String flowId,
@@ -186,10 +160,10 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     String appFlowMappingFile = rootPath + "/" +  ENTITIES_DIR + "/" +
         clusterId + "/" + APP_FLOW_MAPPING_FILE;
     try (BufferedReader reader =
-        new BufferedReader(new InputStreamReader(
-            new FileInputStream(
-                appFlowMappingFile), Charset.forName("UTF-8")));
-        CSVParser parser = new CSVParser(reader, csvFormat)) {
+             new BufferedReader(new InputStreamReader(
+                 new FileInputStream(
+                     appFlowMappingFile), Charset.forName("UTF-8")));
+         CSVParser parser = new CSVParser(reader, csvFormat)) {
       for (CSVRecord record : parser.getRecords()) {
         if (record.size() < 4) {
           continue;
@@ -207,36 +181,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     throw new IOException("Unable to get flow info");
   }
 
-  private static boolean matchMetricFilters(Set<TimelineMetric> metrics,
-      Set<String> metricFilters) {
-    Set<String> tempMetrics = new HashSet<String>();
-    for (TimelineMetric metric : metrics) {
-      tempMetrics.add(metric.getId());
-    }
-
-    for (String metricFilter : metricFilters) {
-      if (!tempMetrics.contains(metricFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
-      Set<String> eventFilters) {
-    Set<String> tempEvents = new HashSet<String>();
-    for (TimelineEvent event : entityEvents) {
-      tempEvents.add(event.getId());
-    }
-
-    for (String eventFilter : eventFilters) {
-      if (!tempEvents.contains(eventFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
       EnumSet<Field> fieldsToRetrieve) {
     TimelineEntity entityToBeReturned = new TimelineEntity();
@@ -254,23 +198,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     return (time >= timeBegin) && (time <= timeEnd);
   }
 
-  private static boolean matchRelations(
-      Map<String, Set<String>> entityRelations,
-      Map<String, Set<String>> relations) {
-    for (Map.Entry<String, Set<String>> relation : relations.entrySet()) {
-      Set<String> ids = entityRelations.get(relation.getKey());
-      if (ids == null) {
-        return false;
-      }
-      for (String id : relation.getValue()) {
-        if (!ids.contains(id)) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
   private static void mergeEntities(TimelineEntity entity1,
       TimelineEntity entity2) {
     // Ideally created time wont change except in the case of issue from client.
@@ -364,22 +291,22 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     // First sort the selected entities based on created/start time.
     Map<Long, Set<TimelineEntity>> sortedEntities =
         new TreeMap<>(
-          new Comparator<Long>() {
-            @Override
-            public int compare(Long l1, Long l2) {
-              return l2.compareTo(l1);
+            new Comparator<Long>() {
+              @Override
+              public int compare(Long l1, Long l2) {
+                return l2.compareTo(l1);
+              }
             }
-          }
         );
     for (File entityFile : dir.listFiles()) {
       if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
         continue;
       }
       try (BufferedReader reader =
-          new BufferedReader(
-              new InputStreamReader(
-                  new FileInputStream(
-                      entityFile), Charset.forName("UTF-8")))) {
+               new BufferedReader(
+                   new InputStreamReader(
+                       new FileInputStream(
+                           entityFile), Charset.forName("UTF-8")))) {
         TimelineEntity entity = readEntityFromFile(reader);
         if (!entity.getType().equals(entityType)) {
           continue;
@@ -393,27 +320,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService
           continue;
         }
         if (relatesTo != null && !relatesTo.isEmpty() &&
-            !matchRelations(entity.getRelatesToEntities(), relatesTo)) {
+            !TimelineReaderUtils
+                .matchRelations(entity.getRelatesToEntities(), relatesTo)) {
           continue;
         }
         if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
-            !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
+            !TimelineReaderUtils
+                .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
           continue;
         }
         if (infoFilters != null && !infoFilters.isEmpty() &&
-            !matchFilters(entity.getInfo(), infoFilters)) {
+            !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
           continue;
         }
         if (configFilters != null && !configFilters.isEmpty() &&
-            !matchFilters(entity.getConfigs(), configFilters)) {
+            !TimelineReaderUtils.matchFilters(
+                entity.getConfigs(), configFilters)) {
           continue;
         }
         if (metricFilters != null && !metricFilters.isEmpty() &&
-            !matchMetricFilters(entity.getMetrics(), metricFilters)) {
+            !TimelineReaderUtils.matchMetricFilters(
+                entity.getMetrics(), metricFilters)) {
           continue;
         }
         if (eventFilters != null && !eventFilters.isEmpty() &&
-            !matchEventFilters(entity.getEvents(), eventFilters)) {
+            !TimelineReaderUtils.matchEventFilters(
+                entity.getEvents(), eventFilters)) {
           continue;
         }
         TimelineEntity entityToBeReturned =
@@ -461,8 +393,8 @@ public class FileSystemTimelineReaderImpl extends AbstractService
     File entityFile =
         new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
     try (BufferedReader reader =
-        new BufferedReader(new InputStreamReader(
-            new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+             new BufferedReader(new InputStreamReader(
+                 new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
       TimelineEntity entity = readEntityFromFile(reader);
       return createEntityToBeReturned(entity, fieldsToRetrieve);
     }

+ 424 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java

@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class HBaseTimelineReaderImpl
+    extends AbstractService implements TimelineReader {
+
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineReaderImpl.class);
+  private static final long DEFAULT_BEGIN_TIME = 0L;
+  private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+  private Configuration hbaseConf = null;
+  private Connection conn;
+  private EntityTable entityTable;
+  private AppToFlowTable appToFlowTable;
+
+  public HBaseTimelineReaderImpl() {
+    super(HBaseTimelineReaderImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    hbaseConf = HBaseConfiguration.create(conf);
+    conn = ConnectionFactory.createConnection(hbaseConf);
+    entityTable = new EntityTable();
+    appToFlowTable = new AppToFlowTable();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (conn != null) {
+      LOG.info("closing the hbase Connection");
+      conn.close();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineEntity getEntity(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve)
+      throws IOException {
+    validateParams(userId, clusterId, appId, entityType, entityId, true);
+    // In reality both should be null or neither should be null
+    if (flowId == null || flowRunId == null) {
+      FlowContext context = lookupFlowContext(clusterId, appId);
+      flowId = context.flowId;
+      flowRunId = context.flowRunId;
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+
+    byte[] rowKey = EntityRowKey.getRowKey(
+        clusterId, userId, flowId, flowRunId, appId, entityType, entityId);
+    Get get = new Get(rowKey);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    return parseEntity(
+        entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve,
+        false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
+        DEFAULT_END_TIME, null, null, null, null, null, null);
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    validateParams(userId, clusterId, appId, entityType, null, false);
+    // In reality both should be null or neither should be null
+    if (flowId == null || flowRunId == null) {
+      FlowContext context = lookupFlowContext(clusterId, appId);
+      flowId = context.flowId;
+      flowRunId = context.flowRunId;
+    }
+    if (limit == null) {
+      limit = TimelineReader.DEFAULT_LIMIT;
+    }
+    if (createdTimeBegin == null) {
+      createdTimeBegin = DEFAULT_BEGIN_TIME;
+    }
+    if (createdTimeEnd == null) {
+      createdTimeEnd = DEFAULT_END_TIME;
+    }
+    if (modifiedTimeBegin == null) {
+      modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+    }
+    if (modifiedTimeEnd == null) {
+      modifiedTimeEnd = DEFAULT_END_TIME;
+    }
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.noneOf(Field.class);
+    }
+
+    NavigableSet<TimelineEntity> entities = new TreeSet<>();
+    // Scan through part of the table to find the entities belong to one app and
+    // one type
+    Scan scan = new Scan();
+    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+        clusterId, userId, flowId, flowRunId, appId, entityType));
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan);
+    for (Result result : scanner) {
+      TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
+          true, createdTimeBegin, createdTimeEnd,
+          true, modifiedTimeBegin, modifiedTimeEnd,
+          isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
+          metricFilters);
+      if (entity == null) {
+        continue;
+      }
+      if (entities.size() > limit) {
+        entities.pollLast();
+      }
+      entities.add(entity);
+    }
+    return entities;
+  }
+
+  private FlowContext lookupFlowContext(String clusterId, String appId)
+      throws IOException {
+    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      return new FlowContext(
+          AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+          ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+    } else {
+       throw new IOException(
+           "Unable to find the context flow ID and flow run ID for clusterId=" +
+           clusterId + ", appId=" + appId);
+    }
+  }
+
+  private static class FlowContext {
+    private String flowId;
+    private Long flowRunId;
+    public FlowContext(String flowId, Long flowRunId) {
+      this.flowId = flowId;
+      this.flowRunId = flowRunId;
+    }
+  }
+
+  private static void validateParams(String userId, String clusterId,
+      String appId, String entityType, String entityId, boolean checkEntityId) {
+    Preconditions.checkNotNull(userId, "userId shouldn't be null");
+    Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+    Preconditions.checkNotNull(appId, "appId shouldn't be null");
+    Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+    if (checkEntityId) {
+      Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
+    }
+  }
+
+  private static TimelineEntity parseEntity(
+      Result result, EnumSet<Field> fieldsToRetrieve,
+      boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
+      boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
+      Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> eventFilters, Set<String> metricFilters)
+          throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    entity.setType(EntityColumn.TYPE.readResult(result).toString());
+    entity.setId(EntityColumn.ID.readResult(result).toString());
+
+    // fetch created time
+    entity.setCreatedTime(
+        ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue());
+    if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
+        entity.getCreatedTime() > createdTimeEnd)) {
+      return null;
+    }
+
+    // fetch modified time
+    entity.setCreatedTime(
+        ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue());
+    if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
+        entity.getModifiedTime() > modifiedTimeEnd)) {
+      return null;
+    }
+
+    // fetch is related to entities
+    boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO);
+      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+          entity.getIsRelatedToEntities(), isRelatedTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities
+    boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO);
+      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+          entity.getRelatesToEntities(), relatesTo)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info
+    boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO);
+      if (checkInfo &&
+          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.INFO)) {
+        entity.getInfo().clear();
+      }
+    }
+
+    // fetch configs
+    boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG);
+      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+          entity.getConfigs(), configFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.CONFIGS)) {
+        entity.getConfigs().clear();
+      }
+    }
+
+    // fetch events
+    boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+      readEvents(entity, result);
+      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+          entity.getEvents(), eventFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics
+    boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+    if (fieldsToRetrieve.contains(Field.ALL) ||
+        fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+      readMetrics(entity, result);
+      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+          entity.getMetrics(), metricFilters)) {
+        return null;
+      }
+      if (!fieldsToRetrieve.contains(Field.ALL) &&
+          !fieldsToRetrieve.contains(Field.METRICS)) {
+        entity.getMetrics().clear();
+      }
+    }
+    return entity;
+  }
+
+  private static void readRelationship(
+      TimelineEntity entity, Result result, EntityColumnPrefix prefix)
+          throws IOException {
+    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+    Map<String, Object> columns = prefix.readResults(result);
+    for (Map.Entry<String, Object> column : columns.entrySet()) {
+      for (String id : Separator.VALUES.splitEncoded(
+          column.getValue().toString())) {
+        if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) {
+          entity.addIsRelatedToEntity(column.getKey(), id);
+        } else {
+          entity.addRelatesToEntity(column.getKey(), id);
+        }
+      }
+    }
+  }
+
+  private static void readKeyValuePairs(
+      TimelineEntity entity, Result result, EntityColumnPrefix prefix)
+          throws IOException {
+    // info and configuration are of type Map<String, Object or String>
+    Map<String, Object> columns = prefix.readResults(result);
+    if (prefix.equals(EntityColumnPrefix.CONFIG)) {
+      for (Map.Entry<String, Object> column : columns.entrySet()) {
+        entity.addConfig(column.getKey(), column.getKey().toString());
+      }
+    } else {
+      entity.addInfo(columns);
+    }
+  }
+
+  private static void readEvents(TimelineEntity entity, Result result)
+      throws IOException {
+    Map<String, TimelineEvent> eventsMap = new HashMap<>();
+    Map<String, Object> eventsResult =
+        EntityColumnPrefix.EVENT.readResults(result);
+    for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
+      Collection<String> tokens =
+          Separator.VALUES.splitEncoded(eventResult.getKey());
+      if (tokens.size() != 2 && tokens.size() != 3) {
+        throw new IOException(
+            "Invalid event column name: " + eventResult.getKey());
+      }
+      Iterator<String> idItr = tokens.iterator();
+      String id = idItr.next();
+      String tsStr = idItr.next();
+      // TODO: timestamp is not correct via ser/des through UTF-8 string
+      Long ts =
+          TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes(
+              StandardCharsets.UTF_8)));
+      String key = Separator.VALUES.joinEncoded(id, ts.toString());
+      TimelineEvent event = eventsMap.get(key);
+      if (event == null) {
+        event = new TimelineEvent();
+        event.setId(id);
+        event.setTimestamp(ts);
+        eventsMap.put(key, event);
+      }
+      if (tokens.size() == 3) {
+        String infoKey = idItr.next();
+        event.addInfo(infoKey, eventResult.getValue());
+      }
+    }
+    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+    entity.addEvents(eventsSet);
+  }
+
+  private static void readMetrics(TimelineEntity entity, Result result)
+      throws IOException {
+    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+        EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
+    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+        metricsResult.entrySet()) {
+      TimelineMetric metric = new TimelineMetric();
+      metric.setId(metricResult.getKey());
+      // Simply assume that if the value set contains more than 1 elements, the
+      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+      metric.setType(metricResult.getValue().size() > 1 ?
+          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+      metric.addValues(metricResult.getValue());
+      entity.addMetric(metric);
+    }
+  }
+}

+ 41 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java

@@ -33,9 +33,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
@@ -55,6 +60,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
   private Connection conn;
   private TypedBufferedMutator<EntityTable> entityTable;
+  private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
 
   private static final Log LOG = LogFactory
       .getLog(HBaseTimelineWriterImpl.class);
@@ -77,6 +83,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     Configuration hbaseConf = HBaseConfiguration.create(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
     entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
+    appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
@@ -97,7 +104,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
       byte[] rowKey =
           EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
-              te);
+              te.getType(), te.getId());
 
       storeInfo(rowKey, te, flowVersion);
       storeEvents(rowKey, te.getEvents());
@@ -107,11 +114,37 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
           EntityColumnPrefix.IS_RELATED_TO);
       storeRelations(rowKey, te.getRelatesToEntities(),
           EntityColumnPrefix.RELATES_TO);
-    }
 
+      if (isApplicationCreated(te)) {
+        onApplicationCreated(
+            clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
+      }
+    }
     return putStatus;
   }
 
+  private static boolean isApplicationCreated(TimelineEntity te) {
+    if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) {
+      boolean isAppCreated = false;
+      for (TimelineEvent event : te.getEvents()) {
+        if (event.getId().equals(
+            ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private void onApplicationCreated(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+    AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
+    AppToFlowColumn.FLOW_RUN_ID.store(
+        rowKey, appToFlowTable, null, flowRunId);
+  }
+
   /**
    * Stores the Relations from the {@linkplain TimelineEntity} object
    */
@@ -245,6 +278,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   public void flush() throws IOException {
     // flush all buffered mutators
     entityTable.flush();
+    appToFlowTable.flush();
   }
 
   /**
@@ -258,6 +292,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       // The close API performs flushing and releases any resources held
       entityTable.close();
     }
+    if (appToFlowTable != null) {
+      LOG.info("closing app_flow table");
+      // The close API performs flushing and releases any resources held
+      appToFlowTable.close();
+    }
     if (conn != null) {
       LOG.info("closing the hbase Connection");
       conn.close();

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
 /**
@@ -70,6 +71,11 @@ public class TimelineSchemaCreator {
       int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
       new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
     }
+    // Grab the appToflowTableName argument
+    String appToflowTableName = commandLine.getOptionValue("a2f");
+    if (StringUtils.isNotBlank(appToflowTableName)) {
+      hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
+    }
     createAllTables(hbaseConf);
   }
 
@@ -95,6 +101,11 @@ public class TimelineSchemaCreator {
     o.setRequired(false);
     options.addOption(o);
 
+    o = new Option("a2f", "appToflowTableName", true, "app to flow table name");
+    o.setArgName("appToflowTableName");
+    o.setRequired(false);
+    options.addOption(o);
+
     CommandLineParser parser = new PosixParser();
     CommandLine commandLine = null;
     try {
@@ -120,6 +131,7 @@ public class TimelineSchemaCreator {
         throw new IOException("Cannot create table since admin is null");
       }
       new EntityTable().createTable(admin, hbaseConf);
+      new AppToFlowTable().createTable(admin, hbaseConf);
     } finally {
       if (conn != null) {
         conn.close();

+ 126 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java

@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+import java.io.IOException;
+
+/**
+ * Identifies fully qualified columns for the {@link AppToFlowTable}.
+ */
+public enum AppToFlowColumn implements Column<AppToFlowTable> {
+
+  /**
+   * The flow ID
+   */
+  FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"),
+
+  /**
+   * The flow run ID
+   */
+  FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id");
+
+  private final ColumnHelper<AppToFlowTable> column;
+  private final ColumnFamily<AppToFlowTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+
+  AppToFlowColumn(ColumnFamily<AppToFlowTable> columnFamily,
+      String columnQualifier) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
+      Object inputValue) throws IOException {
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  /**
+   * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link AppToFlowColumn} or null
+   */
+  public static final AppToFlowColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (AppToFlowColumn ec : AppToFlowColumn.values()) {
+      // Find a match based only on name.
+      if (ec.getColumnQualifier().equals(columnQualifier)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link AppToFlowColumn} or null if both arguments
+   *         don't match.
+   */
+  public static final AppToFlowColumn columnFor(
+      AppToFlowColumnFamily columnFamily, String name) {
+
+    for (AppToFlowColumn ec : AppToFlowColumn.values()) {
+      // Find a match based column family and on name.
+      if (ec.columnFamily.equals(columnFamily)
+          && ec.getColumnQualifier().equals(name)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+}

+ 51 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the app_flow table column families.
+ */
+public enum AppToFlowColumnFamily implements ColumnFamily<AppToFlowTable> {
+  /**
+   * Mapping column family houses known columns such as flowId and flowRunId
+   */
+  MAPPING("m");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  AppToFlowColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}

+ 39 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the app_flow table.
+ */
+public class AppToFlowRowKey {
+  /**
+   * Constructs a row key prefix for the app_flow table as follows:
+   * {@code clusterId!AppId}
+   *
+   * @param clusterId
+   * @param appId
+   * @return byte array with the row key
+   */
+  public static byte[] getRowKey(String clusterId, String appId) {
+    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+  }
+
+}

+ 110 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+
+import java.io.IOException;
+
+/**
+ * The app_flow table as column families mapping. Mapping stores
+ * appId to flowId and flowRunId mapping information
+ *
+ * Example app_flow table record:
+ *
+ * <pre>
+ * |--------------------------------------|
+ * |  Row       | Column Family           |
+ * |  key       | info                    |
+ * |--------------------------------------|
+ * | clusterId! | flowId:                 |
+ * | AppId      | foo@daily_hive_report   |
+ * |            |                         |
+ * |            | flowRunId:              |
+ * |            | 1452828720457           |
+ * |            |                         |
+ * |            |                         |
+ * |            |                         |
+ * |--------------------------------------|
+ * </pre>
+ */
+public class AppToFlowTable extends BaseTable<AppToFlowTable> {
+  /** app_flow prefix */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
+
+  /** config param name that specifies the app_flow table name */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for app_flow table name */
+  private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
+
+  private static final Log LOG = LogFactory.getLog(AppToFlowTable.class);
+
+  public AppToFlowTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor mappCF =
+        new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes());
+    mappCF.setBloomFilterType(BloomType.ROWCOL);
+    appToFlowTableDescp.addFamily(mappCF);
+
+    appToFlowTableDescp
+        .setRegionSplitPolicyClassName(
+            "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(appToFlowTableDescp,
+        TimelineHBaseSchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java

@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -93,6 +95,20 @@ public abstract class BaseTable<T> {
     return table.getScanner(scan);
   }
 
+  /**
+   *
+   * @param hbaseConf used to read settings that override defaults
+   * @param conn used to create table from
+   * @param get that specifies what single row you want to get from this table
+   * @return result of get operation
+   * @throws IOException
+   */
+  public Result getResult(Configuration hbaseConf, Connection conn, Get get)
+      throws IOException {
+    Table table = conn.getTable(getTableName(hbaseConf));
+    return table.get(get);
+  }
+
   /**
    * Get the table name for this table.
    *

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java

@@ -64,7 +64,7 @@ public interface ColumnPrefix<T> {
   public Object readResult(Result result, String qualifier) throws IOException;
 
   /**
-   * @param resultfrom which to read columns
+   * @param result from which to read columns
    * @return the latest values of columns in the column family with this prefix
    *         (or all of them if the prefix value is null).
    * @throws IOException

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java

@@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class TimelineEntitySchemaConstants {
+public class TimelineHBaseSchemaConstants {
 
   /**
    * Used to create a pre-split for tables starting with a username in the

+ 112 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java

@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TimelineReaderUtils {
+  /**
+   *
+   * @param entityRelations the relations of an entity
+   * @param relationFilters the relations for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchRelations(
+      Map<String, Set<String>> entityRelations,
+      Map<String, Set<String>> relationFilters) {
+    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
+      Set<String> ids = entityRelations.get(relation.getKey());
+      if (ids == null) {
+        return false;
+      }
+      for (String id : relation.getValue()) {
+        if (!ids.contains(id)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param map the map of key/value pairs in an entity
+   * @param filters the map of key/value pairs for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchFilters(Map<String, ? extends Object> map,
+      Map<String, ? extends Object> filters) {
+    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+      Object value = map.get(filter.getKey());
+      if (value == null) {
+        return false;
+      }
+      if (!value.equals(filter.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param entityEvents the set of event objects in an entity
+   * @param eventFilters the set of event Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+      Set<String> eventFilters) {
+    Set<String> eventIds = new HashSet<String>();
+    for (TimelineEvent event : entityEvents) {
+      eventIds.add(event.getId());
+    }
+    for (String eventFilter : eventFilters) {
+      if (!eventIds.contains(eventFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param metrics the set of metric objects in an entity
+   * @param metricFilters the set of metric Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+      Set<String> metricFilters) {
+    Set<String> metricIds = new HashSet<String>();
+    for (TimelineMetric metric : metrics) {
+      metricIds.add(metric.getId());
+    }
+
+    for (String metricFilter : metricFilters) {
+      if (!metricIds.contains(metricFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java

@@ -62,7 +62,7 @@ public enum EntityColumn implements Column<EntityTable> {
   private final String columnQualifier;
   private final byte[] columnQualifierBytes;
 
-  private EntityColumn(ColumnFamily<EntityTable> columnFamily,
+  EntityColumn(ColumnFamily<EntityTable> columnFamily,
       String columnQualifier) {
     this.columnFamily = columnFamily;
     this.columnQualifier = columnQualifier;

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java

@@ -53,7 +53,7 @@ public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
    * @param value create a column family with this name. Must be lower case and
    *          without spaces.
    */
-  private EntityColumnFamily(String value) {
+  EntityColumnFamily(String value) {
     // column families should be lower case and not contain any spaces.
     this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
   }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java

@@ -80,7 +80,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    * @param columnFamily that this column is stored in.
    * @param columnPrefix for this column.
    */
-  private EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+  EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
       String columnPrefix) {
     column = new ColumnHelper<EntityTable>(columnFamily);
     this.columnFamily = columnFamily;

+ 32 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java

@@ -55,17 +55,45 @@ public class EntityRowKey {
 
   /**
    * Constructs a row key prefix for the entity table as follows:
-   * {@code userName!clusterId!flowId!flowRunId!AppId}
+   * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!}
    *
    * @param clusterId
    * @param userId
    * @param flowId
    * @param flowRunId
    * @param appId
+   * @param entityType
    * @return byte array with the row key prefix
    */
+  public static byte[] getRowKeyPrefix(String clusterId, String userId,
+      String flowId, Long flowRunId, String appId, String entityType) {
+    byte[] first =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+            flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] third =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+
+  /**
+   * Constructs a row key for the entity table as follows:
+   * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @param entityType
+   * @param entityId
+   * @return byte array with the row key
+   */
   public static byte[] getRowKey(String clusterId, String userId,
-      String flowId, Long flowRunId, String appId, TimelineEntity te) {
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId) {
     byte[] first =
         Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
             flowId));
@@ -73,8 +101,8 @@ public class EntityRowKey {
     // time.
     byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
     byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
-            te.getId()));
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
+            entityId));
     return Separator.QUALIFIERS.join(first, second, third);
   }
 

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java

@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
 
 /**
  * The entity table as column families info, config and metrics. Info stores
@@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
 public class EntityTable extends BaseTable<EntityTable> {
   /** entity prefix */
   private static final String PREFIX =
-      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
 
   /** config param name that specifies the entity table name */
   public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
@@ -146,9 +146,9 @@ public class EntityTable extends BaseTable<EntityTable> {
     entityTableDescp
         .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
     entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
-        TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
     admin.createTable(entityTableDescp,
-        TimelineEntitySchemaConstants.getUsernameSplits());
+        TimelineHBaseSchemaConstants.getUsernameSplits());
     LOG.info("Status of table creation for " + table.getNameAsString() + "="
         + admin.tableExists(table));
   }

+ 73 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -38,11 +39,15 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@@ -71,6 +76,8 @@ public class TestHBaseTimelineWriterImpl {
   private static void createSchema() throws IOException {
     new EntityTable()
         .createTable(util.getHBaseAdmin(), util.getConfiguration());
+    new AppToFlowTable()
+        .createTable(util.getHBaseAdmin(), util.getConfiguration());
   }
 
   @Test
@@ -138,10 +145,15 @@ public class TestHBaseTimelineWriterImpl {
     te.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
     try {
       Configuration c1 = util.getConfiguration();
       hbi = new HBaseTimelineWriterImpl(c1);
       hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
       String cluster = "cluster1";
       String user = "user1";
       String flow = "some_flow_name";
@@ -255,9 +267,22 @@ public class TestHBaseTimelineWriterImpl {
       assertEquals(1, rowCount);
       assertEquals(17, colCount);
 
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertEquals(1, es1.size());
     } finally {
-      hbi.stop();
-      hbi.close();
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+      if (hbr != null) {
+        hbr.stop();
+        hbr.close();
+      }
     }
 
     // Somewhat of a hack, not a separate test in order not to have to deal with
@@ -283,7 +308,7 @@ public class TestHBaseTimelineWriterImpl {
 
   private void testAdditionalEntity() throws IOException {
     TimelineEvent event = new TimelineEvent();
-    String eventId = "foo_event_id";
+    String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
     event.setId(eventId);
     Long expTs = 1436512802000L;
     event.setTimestamp(expTs);
@@ -291,19 +316,23 @@ public class TestHBaseTimelineWriterImpl {
     Object expVal = "test";
     event.addInfo(expKey, expVal);
 
-    final TimelineEntity entity = new TimelineEntity();
-    entity.setId("attempt_1329348432655_0001_m_000008_18");
-    entity.setType("FOO_ATTEMPT");
+    final TimelineEntity entity = new ApplicationEntity();
+    entity.setId(ApplicationId.newInstance(0, 1).toString());
     entity.addEvent(event);
 
     TimelineEntities entities = new TimelineEntities();
     entities.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
     try {
       Configuration c1 = util.getConfiguration();
       hbi = new HBaseTimelineWriterImpl(c1);
       hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
       String cluster = "cluster2";
       String user = "user2";
       String flow = "other_flow_name";
@@ -352,9 +381,31 @@ public class TestHBaseTimelineWriterImpl {
       }
       assertEquals(1, rowCount);
 
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+      TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
+          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es2 = hbr.getEntities(user, cluster, null, null,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertNotNull(e2);
+      assertEquals(e1, e2);
+      assertEquals(1, es1.size());
+      assertEquals(1, es2.size());
+      assertEquals(es1, es2);
     } finally {
-      hbi.stop();
-      hbi.close();
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+      if (hbr != null) {
+        hbr.stop();
+        hbr.close();
+      }
     }
   }
 
@@ -375,10 +426,15 @@ public class TestHBaseTimelineWriterImpl {
     entities.addEntity(entity);
 
     HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
     try {
       Configuration c1 = util.getConfiguration();
       hbi = new HBaseTimelineWriterImpl(c1);
       hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
       String cluster = "cluster_emptyeventkey";
       String user = "user_emptyeventkey";
       String flow = "other_flow_name";
@@ -430,13 +486,21 @@ public class TestHBaseTimelineWriterImpl {
       }
       assertEquals(1, rowCount);
 
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          appName, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertEquals(1, es1.size());
     } finally {
       hbi.stop();
       hbi.close();
+      hbr.stop();;
+      hbr.close();
     }
   }
 
-
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();