Pārlūkot izejas kodu

YARN-3906. Split the application table from the entity table. Contributed by Sangjin Lee.

(cherry picked from commit bcd755eba9466ce277d3c14192c31da6462c4ab3)
Junping Du 9 gadi atpakaļ
vecāks
revīzija
3007622955
13 mainītis faili ar 1230 papildinājumiem un 186 dzēšanām
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 139 63
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
  3. 105 40
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
  4. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
  5. 136 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
  6. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java
  7. 217 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
  8. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
  9. 164 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
  10. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
  11. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
  12. 29 30
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
  13. 269 52
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -85,6 +85,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3049. [Storage Implementation] Implement storage reader interface to
     fetch raw data from HBase backend (Zhijie Shen via sjlee)
 
+    YARN-3906. Split the application table from the entity table. (Sangjin Lee 
+    via junping_du)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

+ 139 - 63
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java

@@ -18,7 +18,19 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,11 +44,17 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
@@ -45,18 +63,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
+import com.google.common.base.Preconditions;
 
 public class HBaseTimelineReaderImpl
     extends AbstractService implements TimelineReader {
@@ -70,6 +77,7 @@ public class HBaseTimelineReaderImpl
   private Connection conn;
   private EntityTable entityTable;
   private AppToFlowTable appToFlowTable;
+  private ApplicationTable applicationTable;
 
   public HBaseTimelineReaderImpl() {
     super(HBaseTimelineReaderImpl.class.getName());
@@ -82,6 +90,7 @@ public class HBaseTimelineReaderImpl
     conn = ConnectionFactory.createConnection(hbaseConf);
     entityTable = new EntityTable();
     appToFlowTable = new AppToFlowTable();
+    applicationTable = new ApplicationTable();
   }
 
   @Override
@@ -109,14 +118,24 @@ public class HBaseTimelineReaderImpl
       fieldsToRetrieve = EnumSet.noneOf(Field.class);
     }
 
-    byte[] rowKey = EntityRowKey.getRowKey(
-        clusterId, userId, flowId, flowRunId, appId, entityType, entityId);
+    boolean isApplication = isApplicationEntity(entityType);
+    byte[] rowKey = isApplication ?
+        ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
+            appId) :
+        EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
+            entityType, entityId);
     Get get = new Get(rowKey);
     get.setMaxVersions(Integer.MAX_VALUE);
-    return parseEntity(
-        entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve,
+    Result result = isApplication ?
+        applicationTable.getResult(hbaseConf, conn, get) :
+        entityTable.getResult(hbaseConf, conn, get);
+    return parseEntity(result, fieldsToRetrieve,
         false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
-        DEFAULT_END_TIME, null, null, null, null, null, null);
+        DEFAULT_END_TIME, null, null, null, null, null, null, isApplication);
+  }
+
+  private static boolean isApplicationEntity(String entityType) {
+    return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType);
   }
 
   @Override
@@ -155,26 +174,46 @@ public class HBaseTimelineReaderImpl
     }
 
     NavigableSet<TimelineEntity> entities = new TreeSet<>();
-    // Scan through part of the table to find the entities belong to one app and
-    // one type
-    Scan scan = new Scan();
-    scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
-        clusterId, userId, flowId, flowRunId, appId, entityType));
-    scan.setMaxVersions(Integer.MAX_VALUE);
-    ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan);
-    for (Result result : scanner) {
+    boolean isApplication = isApplicationEntity(entityType);
+    if (isApplication) {
+      // If getEntities() is called for an application, there can be at most
+      // one entity. If the entity passes the filter, it is returned. Otherwise,
+      // an empty set is returned.
+      byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
+          flowRunId, appId);
+      Get get = new Get(rowKey);
+      get.setMaxVersions(Integer.MAX_VALUE);
+      Result result = applicationTable.getResult(hbaseConf, conn, get);
       TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
-          true, createdTimeBegin, createdTimeEnd,
-          true, modifiedTimeBegin, modifiedTimeEnd,
-          isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
-          metricFilters);
-      if (entity == null) {
-        continue;
+          true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin,
+          modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters,
+          eventFilters, metricFilters, isApplication);
+      if (entity != null) {
+        entities.add(entity);
       }
-      if (entities.size() > limit) {
-        entities.pollLast();
+    } else {
+      // Scan through part of the table to find the entities belong to one app
+      // and one type
+      Scan scan = new Scan();
+      scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+          clusterId, userId, flowId, flowRunId, appId, entityType));
+      scan.setMaxVersions(Integer.MAX_VALUE);
+      ResultScanner scanner =
+          entityTable.getResultScanner(hbaseConf, conn, scan);
+      for (Result result : scanner) {
+        TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
+            true, createdTimeBegin, createdTimeEnd,
+            true, modifiedTimeBegin, modifiedTimeEnd,
+            isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
+            metricFilters, isApplication);
+        if (entity == null) {
+          continue;
+        }
+        if (entities.size() > limit) {
+          entities.pollLast();
+        }
+        entities.add(entity);
       }
-      entities.add(entity);
     }
     return entities;
   }
@@ -221,26 +260,37 @@ public class HBaseTimelineReaderImpl
       boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
       Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
       Map<String, Object> infoFilters, Map<String, String> configFilters,
-      Set<String> eventFilters, Set<String> metricFilters)
+      Set<String> eventFilters, Set<String> metricFilters,
+      boolean isApplication)
           throws IOException {
     if (result == null || result.isEmpty()) {
       return null;
     }
     TimelineEntity entity = new TimelineEntity();
-    entity.setType(EntityColumn.TYPE.readResult(result).toString());
-    entity.setId(EntityColumn.ID.readResult(result).toString());
+    String entityType = isApplication ?
+        TimelineEntityType.YARN_APPLICATION.toString() :
+        EntityColumn.TYPE.readResult(result).toString();
+    entity.setType(entityType);
+    String entityId = isApplication ?
+        ApplicationColumn.ID.readResult(result).toString() :
+        EntityColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
 
     // fetch created time
-    entity.setCreatedTime(
-        ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue());
+    Number createdTime = isApplication ?
+        (Number)ApplicationColumn.CREATED_TIME.readResult(result) :
+        (Number)EntityColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime.longValue());
     if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
         entity.getCreatedTime() > createdTimeEnd)) {
       return null;
     }
 
     // fetch modified time
-    entity.setCreatedTime(
-        ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue());
+    Number modifiedTime = isApplication ?
+        (Number)ApplicationColumn.MODIFIED_TIME.readResult(result) :
+        (Number)EntityColumn.MODIFIED_TIME.readResult(result);
+    entity.setModifiedTime(modifiedTime.longValue());
     if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
         entity.getModifiedTime() > modifiedTimeEnd)) {
       return null;
@@ -250,7 +300,13 @@ public class HBaseTimelineReaderImpl
     boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO);
+      if (isApplication) {
+        readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+            true);
+      } else {
+        readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO,
+            true);
+      }
       if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
           entity.getIsRelatedToEntities(), isRelatedTo)) {
         return null;
@@ -265,7 +321,12 @@ public class HBaseTimelineReaderImpl
     boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
-      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO);
+      if (isApplication) {
+        readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+            false);
+      } else {
+        readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+      }
       if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
           entity.getRelatesToEntities(), relatesTo)) {
         return null;
@@ -280,7 +341,11 @@ public class HBaseTimelineReaderImpl
     boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO);
+      if (isApplication) {
+        readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
+      } else {
+        readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
+      }
       if (checkInfo &&
           !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
         return null;
@@ -295,7 +360,11 @@ public class HBaseTimelineReaderImpl
     boolean checkConfigs = configFilters != null && configFilters.size() > 0;
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG);
+      if (isApplication) {
+        readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
+      } else {
+        readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
+      }
       if (checkConfigs && !TimelineReaderUtils.matchFilters(
           entity.getConfigs(), configFilters)) {
         return null;
@@ -310,7 +379,7 @@ public class HBaseTimelineReaderImpl
     boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
-      readEvents(entity, result);
+      readEvents(entity, result, isApplication);
       if (checkEvents && !TimelineReaderUtils.matchEventFilters(
           entity.getEvents(), eventFilters)) {
         return null;
@@ -325,7 +394,7 @@ public class HBaseTimelineReaderImpl
     boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
-      readMetrics(entity, result);
+      readMetrics(entity, result, isApplication);
       if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
           entity.getMetrics(), metricFilters)) {
         return null;
@@ -338,15 +407,15 @@ public class HBaseTimelineReaderImpl
     return entity;
   }
 
-  private static void readRelationship(
-      TimelineEntity entity, Result result, EntityColumnPrefix prefix)
-          throws IOException {
+  private static <T> void readRelationship(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isRelatedTo) throws IOException {
     // isRelatedTo and relatesTo are of type Map<String, Set<String>>
     Map<String, Object> columns = prefix.readResults(result);
     for (Map.Entry<String, Object> column : columns.entrySet()) {
       for (String id : Separator.VALUES.splitEncoded(
           column.getValue().toString())) {
-        if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) {
+        if (isRelatedTo) {
           entity.addIsRelatedToEntity(column.getKey(), id);
         } else {
           entity.addRelatesToEntity(column.getKey(), id);
@@ -355,12 +424,12 @@ public class HBaseTimelineReaderImpl
     }
   }
 
-  private static void readKeyValuePairs(
-      TimelineEntity entity, Result result, EntityColumnPrefix prefix)
-          throws IOException {
+  private static <T> void readKeyValuePairs(
+      TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+      boolean isConfig) throws IOException {
     // info and configuration are of type Map<String, Object or String>
     Map<String, Object> columns = prefix.readResults(result);
-    if (prefix.equals(EntityColumnPrefix.CONFIG)) {
+    if (isConfig) {
       for (Map.Entry<String, Object> column : columns.entrySet()) {
         entity.addConfig(column.getKey(), column.getKey().toString());
       }
@@ -369,10 +438,11 @@ public class HBaseTimelineReaderImpl
     }
   }
 
-  private static void readEvents(TimelineEntity entity, Result result)
-      throws IOException {
+  private static void readEvents(TimelineEntity entity, Result result,
+      boolean isApplication) throws IOException {
     Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<String, Object> eventsResult =
+    Map<String, Object> eventsResult = isApplication ?
+        ApplicationColumnPrefix.EVENT.readResults(result) :
         EntityColumnPrefix.EVENT.readResults(result);
     for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
       Collection<String> tokens =
@@ -405,10 +475,16 @@ public class HBaseTimelineReaderImpl
     entity.addEvents(eventsSet);
   }
 
-  private static void readMetrics(TimelineEntity entity, Result result)
-      throws IOException {
-    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-        EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
+  private static void readMetrics(TimelineEntity entity, Result result,
+      boolean isApplication) throws IOException {
+    NavigableMap<String, NavigableMap<Long, Number>> metricsResult;
+    if (isApplication) {
+      metricsResult =
+          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
+    } else {
+      metricsResult =
+          EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
+    }
     for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
         metricsResult.entrySet()) {
       TimelineMetric metric = new TimelineMetric();

+ 105 - 40
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java

@@ -38,9 +38,14 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
@@ -61,6 +66,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   private Connection conn;
   private TypedBufferedMutator<EntityTable> entityTable;
   private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
+  private TypedBufferedMutator<ApplicationTable> applicationTable;
 
   private static final Log LOG = LogFactory
       .getLog(HBaseTimelineWriterImpl.class);
@@ -84,6 +90,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     conn = ConnectionFactory.createConnection(hbaseConf);
     entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
     appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
+    applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
@@ -102,18 +109,20 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         continue;
       }
 
-      byte[] rowKey =
+      // if the entity is the application, the destination is the application
+      // table
+      boolean isApplication = isApplicationEntity(te);
+      byte[] rowKey = isApplication ?
+          ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
+              appId) :
           EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
               te.getType(), te.getId());
 
-      storeInfo(rowKey, te, flowVersion);
-      storeEvents(rowKey, te.getEvents());
-      storeConfig(rowKey, te.getConfigs());
-      storeMetrics(rowKey, te.getMetrics());
-      storeRelations(rowKey, te.getIsRelatedToEntities(),
-          EntityColumnPrefix.IS_RELATED_TO);
-      storeRelations(rowKey, te.getRelatesToEntities(),
-          EntityColumnPrefix.RELATES_TO);
+      storeInfo(rowKey, te, flowVersion, isApplication);
+      storeEvents(rowKey, te.getEvents(), isApplication);
+      storeConfig(rowKey, te.getConfigs(), isApplication);
+      storeMetrics(rowKey, te.getMetrics(), isApplication);
+      storeRelations(rowKey, te, isApplication);
 
       if (isApplicationCreated(te)) {
         onApplicationCreated(
@@ -123,9 +132,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     return putStatus;
   }
 
+  private static boolean isApplicationEntity(TimelineEntity te) {
+    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  }
+
   private static boolean isApplicationCreated(TimelineEntity te) {
-    if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) {
-      boolean isAppCreated = false;
+    if (isApplicationEntity(te)) {
       for (TimelineEvent event : te.getEvents()) {
         if (event.getId().equals(
             ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
@@ -145,41 +157,74 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         rowKey, appToFlowTable, null, flowRunId);
   }
 
+  private void storeRelations(byte[] rowKey, TimelineEntity te,
+      boolean isApplication) throws IOException {
+    if (isApplication) {
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          ApplicationColumnPrefix.RELATES_TO, applicationTable);
+    } else {
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          EntityColumnPrefix.IS_RELATED_TO, entityTable);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          EntityColumnPrefix.RELATES_TO, entityTable);
+    }
+  }
+
   /**
    * Stores the Relations from the {@linkplain TimelineEntity} object
    */
-  private void storeRelations(byte[] rowKey,
+  private <T> void storeRelations(byte[] rowKey,
       Map<String, Set<String>> connectedEntities,
-      EntityColumnPrefix entityColumnPrefix) throws IOException {
+      ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
+          throws IOException {
     for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
         .entrySet()) {
       // id3?id4?id5
       String compoundValue =
           Separator.VALUES.joinEncoded(connectedEntity.getValue());
 
-      entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(),
-          null, compoundValue);
+      columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
+          compoundValue);
     }
   }
 
   /**
    * Stores information from the {@linkplain TimelineEntity} object
    */
-  private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion)
-      throws IOException {
+  private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion,
+      boolean isApplication) throws IOException {
 
-    EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
-    EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
-    EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
-        te.getCreatedTime());
-    EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
-        te.getModifiedTime());
-    EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
-    Map<String, Object> info = te.getInfo();
-    if (info != null) {
-      for (Map.Entry<String, Object> entry : info.entrySet()) {
-        EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
-            null, entry.getValue());
+    if (isApplication) {
+      ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
+      ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
+          te.getCreatedTime());
+      ApplicationColumn.MODIFIED_TIME.store(rowKey, applicationTable, null,
+          te.getModifiedTime());
+      ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
+          flowVersion);
+      Map<String, Object> info = te.getInfo();
+      if (info != null) {
+        for (Map.Entry<String, Object> entry : info.entrySet()) {
+          ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
+              entry.getKey(), null, entry.getValue());
+        }
+      }
+    } else {
+      EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
+      EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
+      EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
+          te.getCreatedTime());
+      EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
+          te.getModifiedTime());
+      EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
+      Map<String, Object> info = te.getInfo();
+      if (info != null) {
+        for (Map.Entry<String, Object> entry : info.entrySet()) {
+          EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(),
+              null, entry.getValue());
+        }
       }
     }
   }
@@ -187,14 +232,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   /**
    * stores the config information from {@linkplain TimelineEntity}
    */
-  private void storeConfig(byte[] rowKey, Map<String, String> config)
-      throws IOException {
+  private void storeConfig(byte[] rowKey, Map<String, String> config,
+      boolean isApplication) throws IOException {
     if (config == null) {
       return;
     }
     for (Map.Entry<String, String> entry : config.entrySet()) {
-      EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
+      if (isApplication) {
+        ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
+          entry.getKey(), null, entry.getValue());
+      } else {
+        EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
           null, entry.getValue());
+      }
     }
   }
 
@@ -202,16 +252,21 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
    * stores the {@linkplain TimelineMetric} information from the
    * {@linkplain TimelineEvent} object
    */
-  private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics)
-      throws IOException {
+  private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      boolean isApplication) throws IOException {
     if (metrics != null) {
       for (TimelineMetric metric : metrics) {
         String metricColumnQualifier = metric.getId();
         Map<Long, Number> timeseries = metric.getValues();
         for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
           Long timestamp = timeseriesEntry.getKey();
-          EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+          if (isApplication) {
+            ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable,
               metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+          } else {
+            EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+              metricColumnQualifier, timestamp, timeseriesEntry.getValue());
+          }
         }
       }
     }
@@ -220,8 +275,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   /**
    * Stores the events from the {@linkplain TimelineEvent} object
    */
-  private void storeEvents(byte[] rowKey, Set<TimelineEvent> events)
-      throws IOException {
+  private void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
+      boolean isApplication) throws IOException {
     if (events != null) {
       for (TimelineEvent event : events) {
         if (event != null) {
@@ -258,8 +313,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
                 // convert back to string to avoid additional API on store.
                 String compoundColumnQualifier =
                     Bytes.toString(compoundColumnQualifierBytes);
-                EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                if (isApplication) {
+                  ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
                     compoundColumnQualifier, null, info.getValue());
+                } else {
+                  EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                    compoundColumnQualifier, null, info.getValue());
+                }
               } // for info: eventInfo
             }
           }
@@ -279,6 +339,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     // flush all buffered mutators
     entityTable.flush();
     appToFlowTable.flush();
+    applicationTable.flush();
   }
 
   /**
@@ -288,15 +349,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   @Override
   protected void serviceStop() throws Exception {
     if (entityTable != null) {
-      LOG.info("closing entity table");
+      LOG.info("closing the entity table");
       // The close API performs flushing and releases any resources held
       entityTable.close();
     }
     if (appToFlowTable != null) {
-      LOG.info("closing app_flow table");
+      LOG.info("closing the app_flow table");
       // The close API performs flushing and releases any resources held
       appToFlowTable.close();
     }
+    if (applicationTable != null) {
+      LOG.info("closing the application table");
+      applicationTable.close();
+    }
     if (conn != null) {
       LOG.info("closing the hbase Connection");
       conn.close();

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
@@ -76,6 +77,12 @@ public class TimelineSchemaCreator {
     if (StringUtils.isNotBlank(appToflowTableName)) {
       hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
     }
+    // Grab the applicationTableName argument
+    String applicationTableName = commandLine.getOptionValue("a");
+    if (StringUtils.isNotBlank(applicationTableName)) {
+      hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
+          applicationTableName);
+    }
     createAllTables(hbaseConf);
   }
 
@@ -103,6 +110,8 @@ public class TimelineSchemaCreator {
 
     o = new Option("a2f", "appToflowTableName", true, "app to flow table name");
     o.setArgName("appToflowTableName");
+    o = new Option("a", "applicationTableName", true, "application table name");
+    o.setArgName("applicationTableName");
     o.setRequired(false);
     options.addOption(o);
 
@@ -132,6 +141,7 @@ public class TimelineSchemaCreator {
       }
       new EntityTable().createTable(admin, hbaseConf);
       new AppToFlowTable().createTable(admin, hbaseConf);
+      new ApplicationTable().createTable(admin, hbaseConf);
     } finally {
       if (conn != null) {
         conn.close();

+ 136 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies fully qualified columns for the {@link ApplicationTable}.
+ */
+public enum ApplicationColumn implements Column<ApplicationTable> {
+
+  /**
+   * App id
+   */
+  ID(ApplicationColumnFamily.INFO, "id"),
+
+  /**
+   * When the application was created.
+   */
+  CREATED_TIME(ApplicationColumnFamily.INFO, "created_time"),
+
+  /**
+   * When it was modified.
+   */
+  MODIFIED_TIME(ApplicationColumnFamily.INFO, "modified_time"),
+
+  /**
+   * The version of the flow that this app belongs to.
+   */
+  FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version");
+
+  private final ColumnHelper<ApplicationTable> column;
+  private final ColumnFamily<ApplicationTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+
+  private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
+      String columnQualifier) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<ApplicationTable>(columnFamily);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
+      Object inputValue) throws IOException {
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumn} or null
+   */
+  public static final ApplicationColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (ApplicationColumn ac : ApplicationColumn.values()) {
+      // Find a match based only on name.
+      if (ac.getColumnQualifier().equals(columnQualifier)) {
+        return ac;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumn} or null if both
+   *         arguments don't match.
+   */
+  public static final ApplicationColumn columnFor(
+      ApplicationColumnFamily columnFamily, String name) {
+
+    for (ApplicationColumn ac : ApplicationColumn.values()) {
+      // Find a match based column family and on name.
+      if (ac.columnFamily.equals(columnFamily)
+          && ac.getColumnQualifier().equals(name)) {
+        return ac;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the application table column families.
+ */
+public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i"),
+
+  /**
+   * Configurations are in a separate column family for two reasons: a) the size
+   * of the config values can be very large and b) we expect that config values
+   * are often separately accessed from other metrics and info columns.
+   */
+  CONFIGS("c"),
+
+  /**
+   * Metrics have a separate column family, because they have a separate TTL.
+   */
+  METRICS("m");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private ApplicationColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}

+ 217 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java

@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the application table.
+ */
+public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+
+  /**
+   * To store TimelineEntity getIsRelatedToEntities values.
+   */
+  IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"),
+
+  /**
+   * To store TimelineEntity getRelatesToEntities values.
+   */
+  RELATES_TO(ApplicationColumnFamily.INFO, "r"),
+
+  /**
+   * To store TimelineEntity info values.
+   */
+  INFO(ApplicationColumnFamily.INFO, "i"),
+
+  /**
+   * Lifecycle events for an application
+   */
+  EVENT(ApplicationColumnFamily.INFO, "e"),
+
+  /**
+   * Config column stores configuration with config key as the column name.
+   */
+  CONFIG(ApplicationColumnFamily.CONFIGS, null),
+
+  /**
+   * Metrics are stored with the metric name as the column name.
+   */
+  METRIC(ApplicationColumnFamily.METRICS, null);
+
+  private final ColumnHelper<ApplicationTable> column;
+  private final ColumnFamily<ApplicationTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   */
+  private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
+      String columnPrefix) {
+    column = new ColumnHelper<ApplicationTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes =
+          Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+    }
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue) throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public Map<String, Object> readResults(Result result) throws IOException {
+    return column.readResults(result, columnPrefixBytes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   */
+  public <V> NavigableMap<String, NavigableMap<Long, V>>
+      readResultsWithTimestamps(Result result) throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there
+   * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+   * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumnPrefix} or null
+   */
+  public static final ApplicationColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (acp.getColumnPrefix().equals(columnPrefix)) {
+        return acp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there
+   * is no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link ApplicationColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final ApplicationColumnPrefix columnFor(
+      ApplicationColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (acp.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (acp.getColumnPrefix() == null)) || (acp
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return acp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the application table.
+ */
+public class ApplicationRowKey {
+  // TODO: more methods are needed for this class.
+
+  // TODO: API needs to be cleaned up.
+
+  /**
+   * Constructs a row key for the application table as follows:
+   * {@code clusterId!userName!flowId!flowRunId!AppId}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @return byte array with the row key
+   */
+  public static byte[] getRowKey(String clusterId, String userId,
+      String flowId, Long flowRunId, String appId) {
+    byte[] first =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId,
+            flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId));
+    byte[] third = Bytes.toBytes(appId);
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+
+  /**
+   * Converts a timestamp into its inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invert(Long key) {
+    return Long.MAX_VALUE - key;
+  }
+
+}

+ 164 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java

@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+
+/**
+ * The application table as column families info, config and metrics. Info
+ * stores information about a YARN application entity, config stores
+ * configuration data of a YARN application, metrics stores the metrics of a
+ * YARN application. This table is entirely analogous to the entity table but
+ * created for better performance.
+ *
+ * Example application table record:
+ *
+ * <pre>
+ * |-------------------------------------------------------------------------|
+ * |  Row       | Column Family                | Column Family| Column Family|
+ * |  key       | info                         | metrics      | config       |
+ * |-------------------------------------------------------------------------|
+ * | clusterId! | id:appId                     | metricId1:   | configKey1:  |
+ * | userName!  |                              | metricValue1 | configValue1 |
+ * | flowId!    | created_time:                | @timestamp1  |              |
+ * | flowRunId! | 1392993084018                |              | configKey2:  |
+ * | AppId      |                              | metriciD1:   | configValue2 |
+ * |            | modified_time:               | metricValue2 |              |
+ * |            | 1392995081012                | @timestamp2  |              |
+ * |            |                              |              |              |
+ * |            | i!infoKey:                   | metricId2:   |              |
+ * |            | infoValue                    | metricValue1 |              |
+ * |            |                              | @timestamp2  |              |
+ * |            | r!relatesToKey:              |              |              |
+ * |            | id3?id4?id5                  |              |              |
+ * |            |                              |              |              |
+ * |            | s!isRelatedToKey:            |              |              |
+ * |            | id7?id9?id6                  |              |              |
+ * |            |                              |              |              |
+ * |            | e!eventId?timestamp?infoKey: |              |              |
+ * |            | eventInfoValue               |              |              |
+ * |            |                              |              |              |
+ * |            | flowVersion:                 |              |              |
+ * |            | versionValue                 |              |              |
+ * |-------------------------------------------------------------------------|
+ * </pre>
+ */
+public class ApplicationTable extends BaseTable<ApplicationTable> {
+  /** application prefix */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".application";
+
+  /** config param name that specifies the application table name */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /**
+   * config param name that specifies the TTL for metrics column family in
+   * application table
+   */
+  private static final String METRICS_TTL_CONF_NAME = PREFIX
+      + ".table.metrics.ttl";
+
+  /** default value for application table name */
+  private static final String DEFAULT_TABLE_NAME =
+      "timelineservice.application";
+
+  /** default TTL is 30 days for metrics timeseries */
+  private static final int DEFAULT_METRICS_TTL = 2592000;
+
+  /** default max number of versions */
+  private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
+
+  private static final Log LOG = LogFactory.getLog(ApplicationTable.class);
+
+  public ApplicationTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor applicationTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    applicationTableDescp.addFamily(infoCF);
+
+    HColumnDescriptor configCF =
+        new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes());
+    configCF.setBloomFilterType(BloomType.ROWCOL);
+    configCF.setBlockCacheEnabled(true);
+    applicationTableDescp.addFamily(configCF);
+
+    HColumnDescriptor metricsCF =
+        new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes());
+    applicationTableDescp.addFamily(metricsCF);
+    metricsCF.setBlockCacheEnabled(true);
+    // always keep 1 version (the latest)
+    metricsCF.setMinVersions(1);
+    metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+    metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+        DEFAULT_METRICS_TTL));
+    applicationTableDescp
+        .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(applicationTableDescp,
+        TimelineHBaseSchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+
+  /**
+   * @param metricsTTL time to live parameter for the metrics in this table.
+   * @param hbaseConf configuration in which to set the metrics TTL config
+   *          variable.
+   */
+  public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+    hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+  }
+
+}

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java

@@ -157,7 +157,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
    * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
    */
-  public <T> NavigableMap<String, NavigableMap<Long, T>>
+  public <V> NavigableMap<String, NavigableMap<Long, V>>
       readResultsWithTimestamps(Result result) throws IOException {
     return column.readResultsWithTimestamps(result, columnPrefixBytes);
   }

+ 29 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java

@@ -40,36 +40,35 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
  * Example entity table record:
  *
  * <pre>
- * |--------------------------------------------------------------------|
- * |  Row       | Column Family           | Column Family| Column Family|
- * |  key       | info                    | metrics      | config       |
- * |--------------------------------------------------------------------|
- * | userName!  | id:entityId             | metricId1:   | configKey1:  |
- * | clusterId! |                         | metricValue1 | configValue1 |
- * | flowId!    | type:entityType         | @timestamp1  |              |
- * | flowRunId! |                         |              | configKey2:  |
- * | AppId!     | created_time:           | metriciD1:   | configValue2 |
- * | entityType!| 1392993084018           | metricValue2 |              |
- * | entityId   |                         | @timestamp2  |              |
- * |            | modified_time:          |              |              |
- * |            | 1392995081012           | metricId2:   |              |
- * |            |                         | metricValue1 |              |
- * |            | i!infoKey:              | @timestamp2  |              |
- * |            | infoValue               |              |              |
- * |            |                         |              |              |
- * |            | r!relatesToKey:         |              |              |
- * |            | id3?id4?id5             |              |              |
- * |            |                         |              |              |
- * |            | s!isRelatedToKey        |              |              |
- * |            | id7?id9?id6             |              |              |
- * |            |                         |              |              |
- * |            | e!eventId?eventInfoKey: |              |              |
- * |            | eventInfoValue          |              |              |
- * |            | @timestamp              |              |              |
- * |            |                         |              |              |
- * |            | flowVersion:            |              |              |
- * |            | versionValue            |              |              |
- * |--------------------------------------------------------------------|
+ * |-------------------------------------------------------------------------|
+ * |  Row       | Column Family                | Column Family| Column Family|
+ * |  key       | info                         | metrics      | config       |
+ * |-------------------------------------------------------------------------|
+ * | userName!  | id:entityId                  | metricId1:   | configKey1:  |
+ * | clusterId! |                              | metricValue1 | configValue1 |
+ * | flowId!    | type:entityType              | @timestamp1  |              |
+ * | flowRunId! |                              |              | configKey2:  |
+ * | AppId!     | created_time:                | metriciD1:   | configValue2 |
+ * | entityType!| 1392993084018                | metricValue2 |              |
+ * | entityId   |                              | @timestamp2  |              |
+ * |            | modified_time:               |              |              |
+ * |            | 1392995081012                | metricId2:   |              |
+ * |            |                              | metricValue1 |              |
+ * |            | i!infoKey:                   | @timestamp2  |              |
+ * |            | infoValue                    |              |              |
+ * |            |                              |              |              |
+ * |            | r!relatesToKey:              |              |              |
+ * |            | id3?id4?id5                  |              |              |
+ * |            |                              |              |              |
+ * |            | s!isRelatedToKey             |              |              |
+ * |            | id7?id9?id6                  |              |              |
+ * |            |                              |              |              |
+ * |            | e!eventId?timestamp?infoKey: |              |              |
+ * |            | eventInfoValue               |              |              |
+ * |            |                              |              |              |
+ * |            | flowVersion:                 |              |              |
+ * |            | versionValue                 |              |              |
+ * |-------------------------------------------------------------------------|
  * </pre>
  */
 public class EntityTable extends BaseTable<EntityTable> {

+ 269 - 52
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -47,6 +48,10 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
@@ -60,7 +65,15 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
- * @throws Exception
+ * Various tests to test writing entities to HBase and reading them back from
+ * it.
+ *
+ * It uses a single HBase mini-cluster for all tests which is a little more
+ * realistic, and helps test correctness in the presence of other data.
+ *
+ * Each test uses a different cluster name to be able to handle its own data
+ * even if other records exist in the table. Use a different cluster name if
+ * you add a new test.
  */
 public class TestHBaseTimelineWriterImpl {
 
@@ -78,6 +91,199 @@ public class TestHBaseTimelineWriterImpl {
         .createTable(util.getHBaseAdmin(), util.getConfiguration());
     new AppToFlowTable()
         .createTable(util.getHBaseAdmin(), util.getConfiguration());
+    new ApplicationTable()
+        .createTable(util.getHBaseAdmin(), util.getConfiguration());
+  }
+
+  @Test
+  public void testWriteApplicationToHBase() throws Exception {
+    TimelineEntities te = new TimelineEntities();
+    ApplicationEntity entity = new ApplicationEntity();
+    String id = "hello";
+    entity.setId(id);
+    Long cTime = 1425016501000L;
+    Long mTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    entity.setModifiedTime(mTime);
+
+    // add the info map in Timeline Entity
+    Map<String, Object> infoMap = new HashMap<String, Object>();
+    infoMap.put("infoMapKey1", "infoMapValue1");
+    infoMap.put("infoMapKey2", 10);
+    entity.addInfo(infoMap);
+
+    // add the isRelatedToEntity info
+    String key = "task";
+    String value = "is_related_to_entity_id_here";
+    Set<String> isRelatedToSet = new HashSet<String>();
+    isRelatedToSet.add(value);
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    isRelatedTo.put(key, isRelatedToSet);
+    entity.setIsRelatedToEntities(isRelatedTo);
+
+    // add the relatesTo info
+    key = "container";
+    value = "relates_to_entity_id_here";
+    Set<String> relatesToSet = new HashSet<String>();
+    relatesToSet.add(value);
+    value = "relates_to_entity_id_here_Second";
+    relatesToSet.add(value);
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    relatesTo.put(key, relatesToSet);
+    entity.setRelatesToEntities(relatesTo);
+
+    // add some config entries
+    Map<String, String> conf = new HashMap<String, String>();
+    conf.put("config_param1", "value1");
+    conf.put("config_param2", "value2");
+    entity.addConfigs(conf);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId("MAP_SLOT_MILLIS");
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 120000, 100000000);
+    metricValues.put(ts - 100000, 200000000);
+    metricValues.put(ts - 80000, 300000000);
+    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    te.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    HBaseTimelineReaderImpl hbr = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.start();
+      hbr = new HBaseTimelineReaderImpl();
+      hbr.init(c1);
+      hbr.start();
+      String cluster = "cluster_test_write_app";
+      String user = "user1";
+      String flow = "some_flow_name";
+      String flowVersion = "AB7822C10F1111";
+      long runid = 1002345678919L;
+      hbi.write(cluster, user, flow, flowVersion, runid, id, te);
+      hbi.stop();
+
+      // retrieve the row
+      byte[] rowKey =
+          ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
+      Get get = new Get(rowKey);
+      get.setMaxVersions(Integer.MAX_VALUE);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      Result result = new ApplicationTable().getResult(c1, conn, get);
+
+      assertTrue(result != null);
+      assertEquals(16, result.size());
+
+      // check the row key
+      byte[] row1 = result.getRow();
+      assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
+          id));
+
+      // check info column family
+      String id1 = ApplicationColumn.ID.readResult(result).toString();
+      assertEquals(id, id1);
+
+      Number val =
+          (Number) ApplicationColumn.CREATED_TIME.readResult(result);
+      Long cTime1 = val.longValue();
+      assertEquals(cTime1, cTime);
+
+      val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
+      Long mTime1 = val.longValue();
+      assertEquals(mTime1, mTime);
+
+      Map<String, Object> infoColumns =
+          ApplicationColumnPrefix.INFO.readResults(result);
+      assertEquals(infoMap.size(), infoColumns.size());
+      for (String infoItem : infoMap.keySet()) {
+        assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem));
+      }
+
+      // Remember isRelatedTo is of type Map<String, Set<String>>
+      for (String isRelatedToKey : isRelatedTo.keySet()) {
+        Object isRelatedToValue =
+            ApplicationColumnPrefix.IS_RELATED_TO.readResult(result,
+                isRelatedToKey);
+        String compoundValue = isRelatedToValue.toString();
+        // id7?id9?id6
+        Set<String> isRelatedToValues =
+            new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+        assertEquals(isRelatedTo.get(isRelatedToKey).size(),
+            isRelatedToValues.size());
+        for (String v : isRelatedTo.get(isRelatedToKey)) {
+          assertTrue(isRelatedToValues.contains(v));
+        }
+      }
+
+      // RelatesTo
+      for (String relatesToKey : relatesTo.keySet()) {
+        String compoundValue =
+            ApplicationColumnPrefix.RELATES_TO.readResult(result,
+                relatesToKey).toString();
+        // id3?id4?id5
+        Set<String> relatesToValues =
+            new HashSet<String>(Separator.VALUES.splitEncoded(compoundValue));
+        assertEquals(relatesTo.get(relatesToKey).size(),
+            relatesToValues.size());
+        for (String v : relatesTo.get(relatesToKey)) {
+          assertTrue(relatesToValues.contains(v));
+        }
+      }
+
+      // Configuration
+      Map<String, Object> configColumns =
+          ApplicationColumnPrefix.CONFIG.readResults(result);
+      assertEquals(conf.size(), configColumns.size());
+      for (String configItem : conf.keySet()) {
+        assertEquals(conf.get(configItem), configColumns.get(configItem));
+      }
+
+      NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
+
+      NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+      // We got metrics back
+      assertNotNull(metricMap);
+      // Same number of metrics as we wrote
+      assertEquals(metricValues.entrySet().size(), metricMap.entrySet().size());
+
+      // Iterate over original metrics and confirm that they are present
+      // here.
+      for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
+        assertEquals(metricEntry.getValue(),
+            metricMap.get(metricEntry.getKey()));
+      }
+
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
+      Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+          id, entity.getType(), null, null, null, null, null, null, null,
+          null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+      assertNotNull(e1);
+      assertEquals(1, es1.size());
+    } finally {
+      if (hbi != null) {
+        hbi.stop();
+        hbi.close();
+      }
+      if (hbr != null) {
+        hbr.stop();
+        hbr.close();
+      }
+    }
   }
 
   @Test
@@ -154,7 +360,7 @@ public class TestHBaseTimelineWriterImpl {
       hbr = new HBaseTimelineReaderImpl();
       hbr.init(c1);
       hbr.start();
-      String cluster = "cluster1";
+      String cluster = "cluster_test_write_entity";
       String user = "user1";
       String flow = "some_flow_name";
       String flowVersion = "AB7822C10F1111";
@@ -268,7 +474,8 @@ public class TestHBaseTimelineWriterImpl {
       assertEquals(17, colCount);
 
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
       Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
           appName, entity.getType(), null, null, null, null, null, null, null,
           null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
@@ -284,10 +491,6 @@ public class TestHBaseTimelineWriterImpl {
         hbr.close();
       }
     }
-
-    // Somewhat of a hack, not a separate test in order not to have to deal with
-    // test case order exectution.
-    testAdditionalEntity();
   }
 
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
@@ -299,14 +502,31 @@ public class TestHBaseTimelineWriterImpl {
     assertEquals(user, Bytes.toString(rowKeyComponents[0]));
     assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
     assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.invert(runid), Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(TimelineWriterUtils.invert(runid),
+        Bytes.toLong(rowKeyComponents[3]));
     assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
     assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
     assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
     return true;
   }
 
-  private void testAdditionalEntity() throws IOException {
+  private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
+      String user, String flow, Long runid, String appName) {
+
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+
+    assertTrue(rowKeyComponents.length == 5);
+    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
+    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
+    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+    assertEquals(TimelineWriterUtils.invert(runid),
+        Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+    return true;
+  }
+
+  @Test
+  public void testEvents() throws IOException {
     TimelineEvent event = new TimelineEvent();
     String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
     event.setId(eventId);
@@ -333,7 +553,7 @@ public class TestHBaseTimelineWriterImpl {
       hbr = new HBaseTimelineReaderImpl();
       hbr.init(c1);
       hbr.start();
-      String cluster = "cluster2";
+      String cluster = "cluster_test_events";
       String user = "user2";
       String flow = "other_flow_name";
       String flowVersion = "1111F01C2287BA";
@@ -341,50 +561,46 @@ public class TestHBaseTimelineWriterImpl {
       String appName = "some app name";
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
       hbi.stop();
-      // scan the table and see that entity exists
-      Scan s = new Scan();
-      byte[] startRow =
-          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
-      s.setStartRow(startRow);
-      Connection conn = ConnectionFactory.createConnection(c1);
-      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
-
-      int rowCount = 0;
-      for (Result result : scanner) {
-        if (result != null && !result.isEmpty()) {
-          rowCount++;
-
-          // check the row key
-          byte[] row1 = result.getRow();
-          assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
-              entity));
 
-          Map<String, Object> eventsResult =
-              EntityColumnPrefix.EVENT.readResults(result);
-          // there should be only one event
-          assertEquals(1, eventsResult.size());
-          // key name for the event
-          byte[] compoundColumnQualifierBytes =
-              Separator.VALUES.join(Bytes.toBytes(eventId),
-                  Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
-                  Bytes.toBytes(expKey));
-          String valueKey = Bytes.toString(compoundColumnQualifierBytes);
-          for (Map.Entry<String, Object> e :
-              eventsResult.entrySet()) {
-            // the value key must match
-            assertEquals(valueKey, e.getKey());
-            Object value = e.getValue();
-            // there should be only one timestamp and value
-            assertEquals(expVal, value.toString());
-          }
-        }
+      // retrieve the row
+      byte[] rowKey =
+          ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName);
+      Get get = new Get(rowKey);
+      get.setMaxVersions(Integer.MAX_VALUE);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      Result result = new ApplicationTable().getResult(c1, conn, get);
+
+      assertTrue(result != null);
+
+      // check the row key
+      byte[] row1 = result.getRow();
+      assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
+          appName));
+
+      Map<String, Object> eventsResult =
+          ApplicationColumnPrefix.EVENT.readResults(result);
+      // there should be only one event
+      assertEquals(1, eventsResult.size());
+      // key name for the event
+      byte[] compoundColumnQualifierBytes =
+          Separator.VALUES.join(Bytes.toBytes(eventId),
+              Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
+              Bytes.toBytes(expKey));
+      String valueKey = Bytes.toString(compoundColumnQualifierBytes);
+      for (Map.Entry<String, Object> e : eventsResult.entrySet()) {
+        // the value key must match
+        assertEquals(valueKey, e.getKey());
+        Object value = e.getValue();
+        // there should be only one timestamp and value
+        assertEquals(expVal, value.toString());
       }
-      assertEquals(1, rowCount);
 
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
       TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
-          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
       Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
           appName, entity.getType(), null, null, null, null, null, null, null,
           null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
@@ -410,7 +626,7 @@ public class TestHBaseTimelineWriterImpl {
   }
 
   @Test
-  public void testAdditionalEntityEmptyEventInfo() throws IOException {
+  public void testEventsWithEmptyInfo() throws IOException {
     TimelineEvent event = new TimelineEvent();
     String eventId = "foo_event_id";
     event.setId(eventId);
@@ -435,7 +651,7 @@ public class TestHBaseTimelineWriterImpl {
       hbr = new HBaseTimelineReaderImpl();
       hbr.init(c1);
       hbr.start();
-      String cluster = "cluster_emptyeventkey";
+      String cluster = "cluster_test_empty_eventkey";
       String user = "user_emptyeventkey";
       String flow = "other_flow_name";
       String flowVersion = "1111F01C2287BA";
@@ -487,7 +703,8 @@ public class TestHBaseTimelineWriterImpl {
       assertEquals(1, rowCount);
 
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
-          entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+          entity.getType(), entity.getId(),
+          EnumSet.of(TimelineReader.Field.ALL));
       Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
           appName, entity.getType(), null, null, null, null, null, null, null,
           null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));