Browse Source

YARN-4178. [storage implementation] app id as string in row keys can cause incorrect ordering (Varun Saxena via sjlee)

Sangjin Lee 9 years ago
parent
commit
fdaa1e4e16
22 changed files with 327 additions and 257 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 7 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
  3. 7 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
  4. 8 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
  5. 10 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
  6. 6 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
  7. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
  8. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
  9. 0 112
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
  10. 173 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
  11. 16 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
  12. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
  13. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
  14. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
  15. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
  16. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
  17. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
  18. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
  19. 8 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
  20. 56 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java
  21. 0 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java
  22. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -168,6 +168,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4058. Miscellaneous issues in NodeManager project (Naganarasimha G R
     YARN-4058. Miscellaneous issues in NodeManager project (Naganarasimha G R
     via sjlee)
     via sjlee)
 
 
+    YARN-4178. [storage implementation] app id as string in row keys can cause
+    incorrect ordering (Varun Saxena via sjlee)
+
 Trunk - Unreleased
 Trunk - Unreleased
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 7 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java

@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 
 
@@ -182,7 +182,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
       readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
       readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
           true);
           true);
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
           entity.getIsRelatedToEntities(), isRelatedTo)) {
           entity.getIsRelatedToEntities(), isRelatedTo)) {
         return null;
         return null;
       }
       }
@@ -198,7 +198,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
       readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
       readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
           false);
           false);
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
           entity.getRelatesToEntities(), relatesTo)) {
           entity.getRelatesToEntities(), relatesTo)) {
         return null;
         return null;
       }
       }
@@ -214,7 +214,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
       if (checkInfo &&
       if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
         return null;
         return null;
       }
       }
       if (!fieldsToRetrieve.contains(Field.ALL) &&
       if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -228,7 +228,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
           entity.getConfigs(), configFilters)) {
           entity.getConfigs(), configFilters)) {
         return null;
         return null;
       }
       }
@@ -243,7 +243,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
       readEvents(entity, result, true);
       readEvents(entity, result, true);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
           entity.getEvents(), eventFilters)) {
           entity.getEvents(), eventFilters)) {
         return null;
         return null;
       }
       }
@@ -258,7 +258,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
       readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
       readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
           entity.getMetrics(), metricFilters)) {
           entity.getMetrics(), metricFilters)) {
         return null;
         return null;
       }
       }

+ 7 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java

@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -321,31 +321,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService
           continue;
           continue;
         }
         }
         if (relatesTo != null && !relatesTo.isEmpty() &&
         if (relatesTo != null && !relatesTo.isEmpty() &&
-            !TimelineReaderUtils
+            !TimelineStorageUtils
                 .matchRelations(entity.getRelatesToEntities(), relatesTo)) {
                 .matchRelations(entity.getRelatesToEntities(), relatesTo)) {
           continue;
           continue;
         }
         }
         if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
         if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
-            !TimelineReaderUtils
+            !TimelineStorageUtils
                 .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
                 .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
           continue;
           continue;
         }
         }
         if (infoFilters != null && !infoFilters.isEmpty() &&
         if (infoFilters != null && !infoFilters.isEmpty() &&
-            !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+            !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
           continue;
           continue;
         }
         }
         if (configFilters != null && !configFilters.isEmpty() &&
         if (configFilters != null && !configFilters.isEmpty() &&
-            !TimelineReaderUtils.matchFilters(
+            !TimelineStorageUtils.matchFilters(
                 entity.getConfigs(), configFilters)) {
                 entity.getConfigs(), configFilters)) {
           continue;
           continue;
         }
         }
         if (metricFilters != null && !metricFilters.isEmpty() &&
         if (metricFilters != null && !metricFilters.isEmpty() &&
-            !TimelineReaderUtils.matchMetricFilters(
+            !TimelineStorageUtils.matchMetricFilters(
                 entity.getMetrics(), metricFilters)) {
                 entity.getMetrics(), metricFilters)) {
           continue;
           continue;
         }
         }
         if (eventFilters != null && !eventFilters.isEmpty() &&
         if (eventFilters != null && !eventFilters.isEmpty() &&
-            !TimelineReaderUtils.matchEventFilters(
+            !TimelineStorageUtils.matchEventFilters(
                 entity.getEvents(), eventFilters)) {
                 entity.getEvents(), eventFilters)) {
           continue;
           continue;
         }
         }

+ 8 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java

@@ -44,8 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
@@ -220,7 +219,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
       readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
       readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
           entity.getIsRelatedToEntities(), isRelatedTo)) {
           entity.getIsRelatedToEntities(), isRelatedTo)) {
         return null;
         return null;
       }
       }
@@ -235,7 +234,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
       readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
       readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
           entity.getRelatesToEntities(), relatesTo)) {
           entity.getRelatesToEntities(), relatesTo)) {
         return null;
         return null;
       }
       }
@@ -251,7 +250,7 @@ class GenericEntityReader extends TimelineEntityReader {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
       readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
       readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
       if (checkInfo &&
       if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
         return null;
         return null;
       }
       }
       if (!fieldsToRetrieve.contains(Field.ALL) &&
       if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -265,7 +264,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
       readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
       readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
           entity.getConfigs(), configFilters)) {
           entity.getConfigs(), configFilters)) {
         return null;
         return null;
       }
       }
@@ -280,7 +279,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
       readEvents(entity, result, false);
       readEvents(entity, result, false);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
           entity.getEvents(), eventFilters)) {
           entity.getEvents(), eventFilters)) {
         return null;
         return null;
       }
       }
@@ -295,7 +294,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
       readMetrics(entity, result, EntityColumnPrefix.METRIC);
       readMetrics(entity, result, EntityColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
           entity.getMetrics(), metricFilters)) {
           entity.getMetrics(), metricFilters)) {
         return null;
         return null;
       }
       }
@@ -365,7 +364,7 @@ class GenericEntityReader extends TimelineEntityReader {
       // the column name is of the form "eventId=timestamp=infoKey"
       // the column name is of the form "eventId=timestamp=infoKey"
       if (karr.length == 3) {
       if (karr.length == 3) {
         String id = Bytes.toString(karr[0]);
         String id = Bytes.toString(karr[0]);
-        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
         String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
         String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
         TimelineEvent event = eventsMap.get(key);
         TimelineEvent event = eventsMap.get(key);
         if (event == null) {
         if (event == null) {

+ 10 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java

@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -125,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
 
       // if the entity is the application, the destination is the application
       // if the entity is the application, the destination is the application
       // table
       // table
-      boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
+      boolean isApplication = TimelineStorageUtils.isApplicationEntity(te);
       byte[] rowKey = isApplication ?
       byte[] rowKey = isApplication ?
           ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
           ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
               appId) :
               appId) :
@@ -139,7 +139,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       storeRelations(rowKey, te, isApplication);
       storeRelations(rowKey, te, isApplication);
 
 
       if (isApplication) {
       if (isApplication) {
-        if (TimelineWriterUtils.isApplicationCreated(te)) {
+        if (TimelineStorageUtils.isApplicationCreated(te)) {
           onApplicationCreated(clusterId, userId, flowName, flowVersion,
           onApplicationCreated(clusterId, userId, flowName, flowVersion,
               flowRunId, appId, te);
               flowRunId, appId, te);
         }
         }
@@ -149,7 +149,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         // if application has finished, store it's finish time and write final
         // if application has finished, store it's finish time and write final
         // values
         // values
         // of all metrics
         // of all metrics
-        if (TimelineWriterUtils.isApplicationFinished(te)) {
+        if (TimelineStorageUtils.isApplicationFinished(te)) {
           onApplicationFinished(clusterId, userId, flowName, flowVersion,
           onApplicationFinished(clusterId, userId, flowName, flowVersion,
               flowRunId, appId, te);
               flowRunId, appId, te);
         }
         }
@@ -234,7 +234,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
     Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
         .getAttribute(appId);
         .getAttribute(appId);
     FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
     FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
-        TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+        TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId);
 
 
     // store the final value of metrics since application has finished
     // store the final value of metrics since application has finished
     Set<TimelineMetric> metrics = te.getMetrics();
     Set<TimelineMetric> metrics = te.getMetrics();
@@ -406,9 +406,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
             }
             }
             byte[] columnQualifierFirst =
             byte[] columnQualifierFirst =
                 Bytes.toBytes(Separator.VALUES.encode(eventId));
                 Bytes.toBytes(Separator.VALUES.encode(eventId));
-            byte[] columnQualifierWithTsBytes =
-                Separator.VALUES.join(columnQualifierFirst,
-                    Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp)));
+            byte[] columnQualifierWithTsBytes = Separator.VALUES.
+                join(columnQualifierFirst, Bytes.toBytes(
+                    TimelineStorageUtils.invertLong(eventTimestamp)));
             Map<String, Object> eventInfo = event.getInfo();
             Map<String, Object> eventInfo = event.getInfo();
             if ((eventInfo == null) || (eventInfo.size() == 0)) {
             if ((eventInfo == null) || (eventInfo.size() == 0)) {
               // add separator since event key is empty
               // add separator since event key is empty
@@ -418,11 +418,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
               if (isApplication) {
               if (isApplication) {
                 ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
                 ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
                     compoundColumnQualifierBytes, null,
                     compoundColumnQualifierBytes, null,
-                      TimelineWriterUtils.EMPTY_BYTES);
+                      TimelineStorageUtils.EMPTY_BYTES);
               } else {
               } else {
                 EntityColumnPrefix.EVENT.store(rowKey, entityTable,
                 EntityColumnPrefix.EVENT.store(rowKey, entityTable,
                     compoundColumnQualifierBytes, null,
                     compoundColumnQualifierBytes, null,
-                    TimelineWriterUtils.EMPTY_BYTES);
+                      TimelineStorageUtils.EMPTY_BYTES);
               }
               }
             } else {
             } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {

+ 6 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 
 /**
 /**
  * Represents a rowkey for the application table.
  * Represents a rowkey for the application table.
@@ -90,7 +90,7 @@ public class ApplicationRowKey {
       String flowId, Long flowRunId) {
       String flowId, Long flowRunId) {
     byte[] first = Bytes.toBytes(
     byte[] first = Bytes.toBytes(
         Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
         Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
     return Separator.QUALIFIERS.join(first, second, new byte[0]);
     return Separator.QUALIFIERS.join(first, second, new byte[0]);
   }
   }
 
 
@@ -112,8 +112,8 @@ public class ApplicationRowKey {
             flowId));
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third = Bytes.toBytes(appId);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
     return Separator.QUALIFIERS.join(first, second, third);
     return Separator.QUALIFIERS.join(first, second, third);
   }
   }
 
 
@@ -135,9 +135,8 @@ public class ApplicationRowKey {
     String flowId =
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
-    String appId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
     return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
     return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
   }
   }
 }
 }

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
 
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 
 /**
 /**
  * Represents a rowkey for the app_flow table.
  * Represents a rowkey for the app_flow table.
@@ -49,7 +50,9 @@ public class AppToFlowRowKey {
    * @return byte array with the row key
    * @return byte array with the row key
    */
    */
   public static byte[] getRowKey(String clusterId, String appId) {
   public static byte[] getRowKey(String clusterId, String appId) {
-    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+    byte[] first = Bytes.toBytes(clusterId);
+    byte[] second = TimelineStorageUtils.encodeAppId(appId);
+    return Separator.QUALIFIERS.join(first, second);
   }
   }
 
 
   /**
   /**
@@ -64,7 +67,7 @@ public class AppToFlowRowKey {
     }
     }
 
 
     String clusterId = Bytes.toString(rowKeyComponents[0]);
     String clusterId = Bytes.toString(rowKeyComponents[0]);
-    String appId = Bytes.toString(rowKeyComponents[1]);
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]);
     return new AppToFlowRowKey(clusterId, appId);
     return new AppToFlowRowKey(clusterId, appId);
   }
   }
 }
 }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java

@@ -304,7 +304,7 @@ public enum Separator {
    * @return source split by this separator.
    * @return source split by this separator.
    */
    */
   public byte[][] split(byte[] source, int limit) {
   public byte[][] split(byte[] source, int limit) {
-    return TimelineWriterUtils.split(source, this.bytes, limit);
+    return TimelineStorageUtils.split(source, this.bytes, limit);
   }
   }
 
 
   /**
   /**
@@ -315,6 +315,6 @@ public enum Separator {
    * @return source split by this separator.
    * @return source split by this separator.
    */
    */
   public byte[][] split(byte[] source) {
   public byte[][] split(byte[] source) {
-    return TimelineWriterUtils.split(source, this.bytes);
+    return TimelineStorageUtils.split(source, this.bytes);
   }
   }
 }
 }

+ 0 - 112
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java

@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-
-public class TimelineReaderUtils {
-  /**
-   *
-   * @param entityRelations the relations of an entity
-   * @param relationFilters the relations for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchRelations(
-      Map<String, Set<String>> entityRelations,
-      Map<String, Set<String>> relationFilters) {
-    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
-      Set<String> ids = entityRelations.get(relation.getKey());
-      if (ids == null) {
-        return false;
-      }
-      for (String id : relation.getValue()) {
-        if (!ids.contains(id)) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param map the map of key/value pairs in an entity
-   * @param filters the map of key/value pairs for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchFilters(Map<String, ? extends Object> map,
-      Map<String, ? extends Object> filters) {
-    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
-      Object value = map.get(filter.getKey());
-      if (value == null) {
-        return false;
-      }
-      if (!value.equals(filter.getValue())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param entityEvents the set of event objects in an entity
-   * @param eventFilters the set of event Ids for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
-      Set<String> eventFilters) {
-    Set<String> eventIds = new HashSet<String>();
-    for (TimelineEvent event : entityEvents) {
-      eventIds.add(event.getId());
-    }
-    for (String eventFilter : eventFilters) {
-      if (!eventIds.contains(eventFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param metrics the set of metric objects in an entity
-   * @param metricFilters the set of metric Ids for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
-      Set<String> metricFilters) {
-    Set<String> metricIds = new HashSet<String>();
-    for (TimelineMetric metric : metrics) {
-      metricIds.add(metric.getId());
-    }
-
-    for (String metricFilter : metricFilters) {
-      if (!metricIds.contains(metricFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-}

+ 173 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java

@@ -1,44 +1,51 @@
 /**
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  *
  * Unless required by applicable law or agreed to in writing, software
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
  */
+
 package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.SortedSet;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 
 /**
 /**
- * bunch of utility functions used across TimelineWriter classes
+ * A bunch of utility functions used across TimelineReader and TimelineWriter.
  */
  */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineWriterUtils {
+@Public
+@Unstable
+public class TimelineStorageUtils {
 
 
   /** empty bytes */
   /** empty bytes */
   public static final byte[] EMPTY_BYTES = new byte[0];
   public static final byte[] EMPTY_BYTES = new byte[0];
@@ -53,8 +60,7 @@ public class TimelineWriterUtils {
    * Splits the source array into multiple array segments using the given
    * Splits the source array into multiple array segments using the given
    * separator, up to a maximum of count items. This will naturally produce
    * separator, up to a maximum of count items. This will naturally produce
    * copied byte arrays for each of the split segments. To identify the split
    * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
    *
    *
    * @param source
    * @param source
    * @param separator
    * @param separator
@@ -68,8 +74,7 @@ public class TimelineWriterUtils {
    * Splits the source array into multiple array segments using the given
    * Splits the source array into multiple array segments using the given
    * separator, up to a maximum of count items. This will naturally produce
    * separator, up to a maximum of count items. This will naturally produce
    * copied byte arrays for each of the split segments. To identify the split
    * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
    *
    *
    * @param source
    * @param source
    * @param separator
    * @param separator
@@ -127,8 +132,7 @@ public class TimelineWriterUtils {
         // everything else goes in one final segment
         // everything else goes in one final segment
         break;
         break;
       }
       }
-
-      segments.add(new Range(start, i));
+	      segments.add(new Range(start, i));
       start = i + separator.length;
       start = i + separator.length;
       // i will be incremented again in outer for loop
       // i will be incremented again in outer for loop
       i += separator.length - 1;
       i += separator.length - 1;
@@ -149,10 +153,70 @@ public class TimelineWriterUtils {
    *          a scan.
    *          a scan.
    * @return inverted long
    * @return inverted long
    */
    */
-  public static long invert(Long key) {
+  public static long invertLong(long key) {
     return Long.MAX_VALUE - key;
     return Long.MAX_VALUE - key;
   }
   }
 
 
+  /**
+   * Converts an int into it's inverse int to be used in (row) keys
+   * where we want to have the largest int value in the top of the table
+   * (scans start at the largest int first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted int
+   */
+  public static int invertInt(int key) {
+    return Integer.MAX_VALUE - key;
+  }
+
+
+  /**
+   * Converts/encodes a string app Id into a byte representation for (row) keys.
+   * For conversion, we extract cluster timestamp and sequence id from the
+   * string app id (calls {@link ConverterUtils#toApplicationId(String)} for
+   * conversion) and then store it in a byte array of length 12 (8 bytes (long)
+   * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
+   * timestamp and sequence id are inverted so that the most recent cluster
+   * timestamp and highest sequence id appears first in the table (i.e.
+   * application id appears in a descending order).
+   *
+   * @param appIdStr application id in string format i.e.
+   * application_{cluster timestamp}_{sequence id with min 4 digits}
+   *
+   * @return encoded byte representation of app id.
+   */
+  public static byte[] encodeAppId(String appIdStr) {
+    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+    byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT];
+    byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp()));
+    System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
+    byte[] seqId = Bytes.toBytes(invertInt(appId.getId()));
+    System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
+    return appIdBytes;
+  }
+
+  /**
+   * Converts/decodes a 12 byte representation of app id for (row) keys to an
+   * app id in string format which can be returned back to client.
+   * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
+   * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
+   * {@link ApplicationId#toString} to generate string representation of app id.
+   *
+   * @param appIdBytes application id in byte representation.
+   *
+   * @return decoded app id in string format.
+   */
+  public static String decodeAppId(byte[] appIdBytes) {
+    if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) {
+      throw new IllegalArgumentException("Invalid app id in byte format");
+    }
+    long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
+    int seqId =
+        invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
+    return ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+
   /**
   /**
    * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
    * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
    * for a given input timestamp
    * for a given input timestamp
@@ -325,4 +389,87 @@ public class TimelineWriterUtils {
     return null;
     return null;
   }
   }
 
 
-}
+  /**
+   *
+   * @param entityRelations the relations of an entity
+   * @param relationFilters the relations for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchRelations(
+      Map<String, Set<String>> entityRelations,
+      Map<String, Set<String>> relationFilters) {
+    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
+      Set<String> ids = entityRelations.get(relation.getKey());
+      if (ids == null) {
+        return false;
+      }
+      for (String id : relation.getValue()) {
+        if (!ids.contains(id)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param map the map of key/value pairs in an entity
+   * @param filters the map of key/value pairs for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchFilters(Map<String, ? extends Object> map,
+      Map<String, ? extends Object> filters) {
+    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+      Object value = map.get(filter.getKey());
+      if (value == null) {
+        return false;
+      }
+      if (!value.equals(filter.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param entityEvents the set of event objects in an entity
+   * @param eventFilters the set of event Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+      Set<String> eventFilters) {
+    Set<String> eventIds = new HashSet<String>();
+    for (TimelineEvent event : entityEvents) {
+      eventIds.add(event.getId());
+    }
+    for (String eventFilter : eventFilters) {
+      if (!eventIds.contains(eventFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param metrics the set of metric objects in an entity
+   * @param metricFilters the set of metric Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+      Set<String> metricFilters) {
+    Set<String> metricIds = new HashSet<String>();
+    for (TimelineMetric metric : metrics) {
+      metricIds.add(metric.getId());
+    }
+
+    for (String metricFilter : metricFilters) {
+      if (!metricIds.contains(metricFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

+ 16 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 
 /**
 /**
  * Represents a rowkey for the entity table.
  * Represents a rowkey for the entity table.
@@ -90,9 +90,9 @@ public class EntityRowKey {
             flowId));
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    return Separator.QUALIFIERS.join(first, second, third, new byte[0]);
   }
   }
 
 
   /**
   /**
@@ -114,10 +114,11 @@ public class EntityRowKey {
             flowId));
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    byte[] fourth =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, ""));
+    return Separator.QUALIFIERS.join(first, second, third, fourth);
   }
   }
 
 
   /**
   /**
@@ -141,11 +142,11 @@ public class EntityRowKey {
             flowId));
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
-            entityId));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    byte[] fourth =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId));
+    return Separator.QUALIFIERS.join(first, second, third, fourth);
   }
   }
 
 
   /**
   /**
@@ -166,9 +167,8 @@ public class EntityRowKey {
     String flowId =
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
-    String appId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
     String entityType =
     String entityType =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
     String entityId =
     String entityId =

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 
 /**
 /**
@@ -114,7 +114,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
 
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
         combinedAttributes);
@@ -235,7 +235,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
 
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
         combinedAttributes);
         combinedAttributes);

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 
 /**
 /**
  * Represents a rowkey for the flow activity table.
  * Represents a rowkey for the flow activity table.
@@ -71,7 +71,7 @@ public class FlowActivityRowKey {
    */
    */
   public static byte[] getRowKey(String clusterId, String userId,
   public static byte[] getRowKey(String clusterId, String userId,
       String flowId) {
       String flowId) {
-    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
         .currentTimeMillis());
         .currentTimeMillis());
     return getRowKey(clusterId, dayTs, userId, flowId);
     return getRowKey(clusterId, dayTs, userId, flowId);
   }
   }
@@ -90,7 +90,7 @@ public class FlowActivityRowKey {
       String flowId) {
       String flowId) {
     return Separator.QUALIFIERS.join(
     return Separator.QUALIFIERS.join(
         Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
-        Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+        Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
         Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
   }
   }
@@ -108,7 +108,8 @@ public class FlowActivityRowKey {
 
 
     String clusterId = Separator.QUALIFIERS.decode(Bytes
     String clusterId = Separator.QUALIFIERS.decode(Bytes
         .toString(rowKeyComponents[0]));
         .toString(rowKeyComponents[0]));
-    long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+    long dayTs =
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
     String userId = Separator.QUALIFIERS.decode(Bytes
     String userId = Separator.QUALIFIERS.decode(Bytes
         .toString(rowKeyComponents[2]));
         .toString(rowKeyComponents[2]));
     String flowId = Separator.QUALIFIERS.decode(Bytes
     String flowId = Separator.QUALIFIERS.decode(Bytes

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 
 /**
 /**
@@ -97,7 +97,7 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
       TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
       TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
       Object inputValue, Attribute... attributes) throws IOException {
       Object inputValue, Attribute... attributes) throws IOException {
 
 
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, aggOp);
         attributes, aggOp);
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
         inputValue, combinedAttributes);
         inputValue, combinedAttributes);

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 
 /**
 /**
@@ -112,7 +112,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
         combinedAttributes);
@@ -140,7 +140,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
         combinedAttributes);

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java

@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 
 
 public class FlowRunCoprocessor extends BaseRegionObserver {
 public class FlowRunCoprocessor extends BaseRegionObserver {
@@ -89,7 +89,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
     List<Tag> tags = new ArrayList<>();
     List<Tag> tags = new ArrayList<>();
     if ((attributes != null) && (attributes.size() > 0)) {
     if ((attributes != null) && (attributes.size() > 0)) {
       for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
       for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
-        Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
+        Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
         tags.add(t);
         tags.add(t);
       }
       }
       byte[] tagByteArray = Tag.fromList(tags);
       byte[] tagByteArray = Tag.fromList(tags);

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 
 /**
 /**
  * Represents a rowkey for the flow run table.
  * Represents a rowkey for the flow run table.
@@ -70,7 +70,7 @@ public class FlowRunRowKey {
         userId, flowId));
         userId, flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
     return Separator.QUALIFIERS.join(first, second);
     return Separator.QUALIFIERS.join(first, second);
   }
   }
 
 
@@ -92,7 +92,7 @@ public class FlowRunRowKey {
     String flowId =
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
     return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
     return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
   }
   }
 }
 }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java

@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 
 /**
 /**
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -136,7 +136,7 @@ class FlowScanner implements RegionScanner, Closeable {
     // So all cells in one qualifier come one after the other before we see the
     // So all cells in one qualifier come one after the other before we see the
     // next column qualifier
     // next column qualifier
     ByteArrayComparator comp = new ByteArrayComparator();
     ByteArrayComparator comp = new ByteArrayComparator();
-    byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+    byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES;
     AggregationOperation currentAggOp = null;
     AggregationOperation currentAggOp = null;
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     Set<String> alreadySeenAggDim = new HashSet<>();
     Set<String> alreadySeenAggDim = new HashSet<>();
@@ -163,7 +163,7 @@ class FlowScanner implements RegionScanner, Closeable {
     List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
     List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
         cell.getTagsLength());
         cell.getTagsLength());
     // We assume that all the operations for a particular column are the same
     // We assume that all the operations for a particular column are the same
-    return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+    return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
   }
   }
 
 
   /**
   /**

+ 8 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java

@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -366,7 +366,8 @@ public class TestHBaseTimelineStorage {
       String flow = "some_flow_name";
       String flow = "some_flow_name";
       String flowVersion = "AB7822C10F1111";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
       long runid = 1002345678919L;
-      String appName = "some app name";
+      String appName =
+          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       hbi.stop();
       hbi.stop();
 
 
@@ -592,7 +593,8 @@ public class TestHBaseTimelineStorage {
         byte[][] karr = (byte[][])e.getKey();
         byte[][] karr = (byte[][])e.getKey();
         assertEquals(3, karr.length);
         assertEquals(3, karr.length);
         assertEquals(eventId, Bytes.toString(karr[0]));
         assertEquals(eventId, Bytes.toString(karr[0]));
-        assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
+        assertEquals(
+            TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1]));
         assertEquals(expKey, Bytes.toString(karr[2]));
         assertEquals(expKey, Bytes.toString(karr[2]));
         Object value = e.getValue();
         Object value = e.getValue();
         // there should be only one timestamp and value
         // there should be only one timestamp and value
@@ -667,7 +669,8 @@ public class TestHBaseTimelineStorage {
       String flow = "other_flow_name";
       String flow = "other_flow_name";
       String flowVersion = "1111F01C2287BA";
       String flowVersion = "1111F01C2287BA";
       long runid = 1009876543218L;
       long runid = 1009876543218L;
-      String appName = "some app name";
+      String appName =
+          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
       byte[] startRow =
       byte[] startRow =
           EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
           EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
@@ -700,7 +703,7 @@ public class TestHBaseTimelineStorage {
             byte[][] karr = (byte[][])e.getKey();
             byte[][] karr = (byte[][])e.getKey();
             assertEquals(3, karr.length);
             assertEquals(3, karr.length);
             assertEquals(eventId, Bytes.toString(karr[0]));
             assertEquals(eventId, Bytes.toString(karr[0]));
-            assertEquals(TimelineWriterUtils.invert(expTs),
+            assertEquals(TimelineStorageUtils.invertLong(expTs),
                 Bytes.toLong(karr[1]));
                 Bytes.toLong(karr[1]));
             // key must be empty
             // key must be empty
             assertEquals(0, karr[2].length);
             assertEquals(0, karr[2].length);

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+public class TestTimelineStorageUtils {
+
+  @Test
+  public void testEncodeDecodeAppId() {
+    long currentTs = System.currentTimeMillis();
+    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
+    String appIdStr1 = appId1.toString();
+    String appIdStr2 = appId2.toString();
+    String appIdStr3 = appId3.toString();
+    byte[] appIdBytes1 = TimelineStorageUtils.encodeAppId(appIdStr1);
+    byte[] appIdBytes2 = TimelineStorageUtils.encodeAppId(appIdStr2);
+    byte[] appIdBytes3 = TimelineStorageUtils.encodeAppId(appIdStr3);
+    // App ids' should be encoded in a manner wherein descending order
+    // is maintained.
+    assertTrue("Ordering of app ids' is incorrect",
+        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
+        Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
+        Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+    String decodedAppId1 = TimelineStorageUtils.decodeAppId(appIdBytes1);
+    String decodedAppId2 = TimelineStorageUtils.decodeAppId(appIdBytes2);
+    String decodedAppId3 = TimelineStorageUtils.decodeAppId(appIdBytes3);
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr1.equals(decodedAppId1));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr2.equals(decodedAppId2));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr3.equals(decodedAppId3));
+  }
+}

+ 0 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java

@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.junit.Test;
-
-public class TestTimelineWriterUtils {
-
-  @Test
-  public void test() {
-    // TODO: implement a test for the remaining method in TimelineWriterUtils.
-  }
-
-}

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java

@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReader
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Test;
@@ -166,7 +166,7 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(cluster, flowActivityRowKey.getClusterId());
     assertEquals(cluster, flowActivityRowKey.getClusterId());
     assertEquals(user, flowActivityRowKey.getUserId());
     assertEquals(user, flowActivityRowKey.getUserId());
     assertEquals(flow, flowActivityRowKey.getFlowId());
     assertEquals(flow, flowActivityRowKey.getFlowId());
-    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
         .currentTimeMillis());
         .currentTimeMillis());
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(1, values.size());
     assertEquals(1, values.size());
@@ -281,7 +281,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowId());
       assertEquals(flow, flowActivityRowKey.getFlowId());
-      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
           .currentTimeMillis());
           .currentTimeMillis());
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
       assertEquals(1, values.size());
       assertEquals(1, values.size());
@@ -360,7 +360,7 @@ public class TestHBaseStorageFlowActivity {
         assertEquals(cluster, flowActivity.getCluster());
         assertEquals(cluster, flowActivity.getCluster());
         assertEquals(user, flowActivity.getUser());
         assertEquals(user, flowActivity.getUser());
         assertEquals(flow, flowActivity.getFlowName());
         assertEquals(flow, flowActivity.getFlowName());
-        long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+        long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
             .currentTimeMillis());
             .currentTimeMillis());
         assertEquals(dayTs, flowActivity.getDate().getTime());
         assertEquals(dayTs, flowActivity.getDate().getTime());
         Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
         Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
@@ -410,7 +410,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowId());
       assertEquals(flow, flowActivityRowKey.getFlowId());
-      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
           .currentTimeMillis());
           .currentTimeMillis());
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());