Bläddra i källkod

YARN-4178. [storage implementation] app id as string in row keys can cause incorrect ordering (Varun Saxena via sjlee)

Sangjin Lee 10 år sedan
förälder
incheckning
b51d0fef56
21 ändrade filer med 324 tillägg och 257 borttagningar
  1. 7 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
  2. 7 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
  3. 8 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
  4. 10 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
  5. 6 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
  6. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
  7. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
  8. 0 112
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
  9. 173 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
  10. 16 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
  11. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
  12. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
  13. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
  14. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
  15. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
  16. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
  17. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
  18. 8 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
  19. 56 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java
  20. 0 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java
  21. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java

+ 7 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java

@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -182,7 +182,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
       readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
           true);
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
           entity.getIsRelatedToEntities(), isRelatedTo)) {
         return null;
       }
@@ -198,7 +198,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
       readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
           false);
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
           entity.getRelatesToEntities(), relatesTo)) {
         return null;
       }
@@ -214,7 +214,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
       if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
         return null;
       }
       if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -228,7 +228,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
           entity.getConfigs(), configFilters)) {
         return null;
       }
@@ -243,7 +243,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
       readEvents(entity, result, true);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
           entity.getEvents(), eventFilters)) {
         return null;
       }
@@ -258,7 +258,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
       readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
           entity.getMetrics(), metricFilters)) {
         return null;
       }

+ 7 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java

@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -321,31 +321,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService
           continue;
         }
         if (relatesTo != null && !relatesTo.isEmpty() &&
-            !TimelineReaderUtils
+            !TimelineStorageUtils
                 .matchRelations(entity.getRelatesToEntities(), relatesTo)) {
           continue;
         }
         if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
-            !TimelineReaderUtils
+            !TimelineStorageUtils
                 .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
           continue;
         }
         if (infoFilters != null && !infoFilters.isEmpty() &&
-            !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+            !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
           continue;
         }
         if (configFilters != null && !configFilters.isEmpty() &&
-            !TimelineReaderUtils.matchFilters(
+            !TimelineStorageUtils.matchFilters(
                 entity.getConfigs(), configFilters)) {
           continue;
         }
         if (metricFilters != null && !metricFilters.isEmpty() &&
-            !TimelineReaderUtils.matchMetricFilters(
+            !TimelineStorageUtils.matchMetricFilters(
                 entity.getMetrics(), metricFilters)) {
           continue;
         }
         if (eventFilters != null && !eventFilters.isEmpty() &&
-            !TimelineReaderUtils.matchEventFilters(
+            !TimelineStorageUtils.matchEventFilters(
                 entity.getEvents(), eventFilters)) {
           continue;
         }

+ 8 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java

@@ -44,8 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
@@ -220,7 +219,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
       readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
           entity.getIsRelatedToEntities(), isRelatedTo)) {
         return null;
       }
@@ -235,7 +234,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
       readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
           entity.getRelatesToEntities(), relatesTo)) {
         return null;
       }
@@ -251,7 +250,7 @@ class GenericEntityReader extends TimelineEntityReader {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
       readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
       if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
         return null;
       }
       if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -265,7 +264,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
       readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
           entity.getConfigs(), configFilters)) {
         return null;
       }
@@ -280,7 +279,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
       readEvents(entity, result, false);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
           entity.getEvents(), eventFilters)) {
         return null;
       }
@@ -295,7 +294,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
       readMetrics(entity, result, EntityColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
           entity.getMetrics(), metricFilters)) {
         return null;
       }
@@ -365,7 +364,7 @@ class GenericEntityReader extends TimelineEntityReader {
       // the column name is of the form "eventId=timestamp=infoKey"
       if (karr.length == 3) {
         String id = Bytes.toString(karr[0]);
-        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
         String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
         TimelineEvent event = eventsMap.get(key);
         if (event == null) {

+ 10 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java

@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -125,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
       // if the entity is the application, the destination is the application
       // table
-      boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
+      boolean isApplication = TimelineStorageUtils.isApplicationEntity(te);
       byte[] rowKey = isApplication ?
           ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
               appId) :
@@ -139,7 +139,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       storeRelations(rowKey, te, isApplication);
 
       if (isApplication) {
-        if (TimelineWriterUtils.isApplicationCreated(te)) {
+        if (TimelineStorageUtils.isApplicationCreated(te)) {
           onApplicationCreated(clusterId, userId, flowName, flowVersion,
               flowRunId, appId, te);
         }
@@ -149,7 +149,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         // if application has finished, store it's finish time and write final
         // values
         // of all metrics
-        if (TimelineWriterUtils.isApplicationFinished(te)) {
+        if (TimelineStorageUtils.isApplicationFinished(te)) {
           onApplicationFinished(clusterId, userId, flowName, flowVersion,
               flowRunId, appId, te);
         }
@@ -234,7 +234,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
         .getAttribute(appId);
     FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
-        TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+        TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId);
 
     // store the final value of metrics since application has finished
     Set<TimelineMetric> metrics = te.getMetrics();
@@ -406,9 +406,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
             }
             byte[] columnQualifierFirst =
                 Bytes.toBytes(Separator.VALUES.encode(eventId));
-            byte[] columnQualifierWithTsBytes =
-                Separator.VALUES.join(columnQualifierFirst,
-                    Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp)));
+            byte[] columnQualifierWithTsBytes = Separator.VALUES.
+                join(columnQualifierFirst, Bytes.toBytes(
+                    TimelineStorageUtils.invertLong(eventTimestamp)));
             Map<String, Object> eventInfo = event.getInfo();
             if ((eventInfo == null) || (eventInfo.size() == 0)) {
               // add separator since event key is empty
@@ -418,11 +418,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
               if (isApplication) {
                 ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
                     compoundColumnQualifierBytes, null,
-                      TimelineWriterUtils.EMPTY_BYTES);
+                      TimelineStorageUtils.EMPTY_BYTES);
               } else {
                 EntityColumnPrefix.EVENT.store(rowKey, entityTable,
                     compoundColumnQualifierBytes, null,
-                    TimelineWriterUtils.EMPTY_BYTES);
+                      TimelineStorageUtils.EMPTY_BYTES);
               }
             } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {

+ 6 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the application table.
@@ -90,7 +90,7 @@ public class ApplicationRowKey {
       String flowId, Long flowRunId) {
     byte[] first = Bytes.toBytes(
         Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
     return Separator.QUALIFIERS.join(first, second, new byte[0]);
   }
 
@@ -112,8 +112,8 @@ public class ApplicationRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third = Bytes.toBytes(appId);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
@@ -135,9 +135,8 @@ public class ApplicationRowKey {
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
-    String appId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
     return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
   }
 }

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the app_flow table.
@@ -49,7 +50,9 @@ public class AppToFlowRowKey {
    * @return byte array with the row key
    */
   public static byte[] getRowKey(String clusterId, String appId) {
-    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+    byte[] first = Bytes.toBytes(clusterId);
+    byte[] second = TimelineStorageUtils.encodeAppId(appId);
+    return Separator.QUALIFIERS.join(first, second);
   }
 
   /**
@@ -64,7 +67,7 @@ public class AppToFlowRowKey {
     }
 
     String clusterId = Bytes.toString(rowKeyComponents[0]);
-    String appId = Bytes.toString(rowKeyComponents[1]);
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]);
     return new AppToFlowRowKey(clusterId, appId);
   }
 }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java

@@ -304,7 +304,7 @@ public enum Separator {
    * @return source split by this separator.
    */
   public byte[][] split(byte[] source, int limit) {
-    return TimelineWriterUtils.split(source, this.bytes, limit);
+    return TimelineStorageUtils.split(source, this.bytes, limit);
   }
 
   /**
@@ -315,6 +315,6 @@ public enum Separator {
    * @return source split by this separator.
    */
   public byte[][] split(byte[] source) {
-    return TimelineWriterUtils.split(source, this.bytes);
+    return TimelineStorageUtils.split(source, this.bytes);
   }
 }

+ 0 - 112
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java

@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-
-public class TimelineReaderUtils {
-  /**
-   *
-   * @param entityRelations the relations of an entity
-   * @param relationFilters the relations for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchRelations(
-      Map<String, Set<String>> entityRelations,
-      Map<String, Set<String>> relationFilters) {
-    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
-      Set<String> ids = entityRelations.get(relation.getKey());
-      if (ids == null) {
-        return false;
-      }
-      for (String id : relation.getValue()) {
-        if (!ids.contains(id)) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param map the map of key/value pairs in an entity
-   * @param filters the map of key/value pairs for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchFilters(Map<String, ? extends Object> map,
-      Map<String, ? extends Object> filters) {
-    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
-      Object value = map.get(filter.getKey());
-      if (value == null) {
-        return false;
-      }
-      if (!value.equals(filter.getValue())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param entityEvents the set of event objects in an entity
-   * @param eventFilters the set of event Ids for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
-      Set<String> eventFilters) {
-    Set<String> eventIds = new HashSet<String>();
-    for (TimelineEvent event : entityEvents) {
-      eventIds.add(event.getId());
-    }
-    for (String eventFilter : eventFilters) {
-      if (!eventIds.contains(eventFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param metrics the set of metric objects in an entity
-   * @param metricFilters the set of metric Ids for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
-      Set<String> metricFilters) {
-    Set<String> metricIds = new HashSet<String>();
-    for (TimelineMetric metric : metrics) {
-      metricIds.add(metric.getId());
-    }
-
-    for (String metricFilter : metricFilters) {
-      if (!metricIds.contains(metricFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-}

+ 173 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java

@@ -1,44 +1,51 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
+
 package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
- * bunch of utility functions used across TimelineWriter classes
+ * A bunch of utility functions used across TimelineReader and TimelineWriter.
  */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineWriterUtils {
+@Public
+@Unstable
+public class TimelineStorageUtils {
 
   /** empty bytes */
   public static final byte[] EMPTY_BYTES = new byte[0];
@@ -53,8 +60,7 @@ public class TimelineWriterUtils {
    * Splits the source array into multiple array segments using the given
    * separator, up to a maximum of count items. This will naturally produce
    * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
    *
    * @param source
    * @param separator
@@ -68,8 +74,7 @@ public class TimelineWriterUtils {
    * Splits the source array into multiple array segments using the given
    * separator, up to a maximum of count items. This will naturally produce
    * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
    *
    * @param source
    * @param separator
@@ -127,8 +132,7 @@ public class TimelineWriterUtils {
         // everything else goes in one final segment
         break;
       }
-
-      segments.add(new Range(start, i));
+	      segments.add(new Range(start, i));
       start = i + separator.length;
       // i will be incremented again in outer for loop
       i += separator.length - 1;
@@ -149,10 +153,70 @@ public class TimelineWriterUtils {
    *          a scan.
    * @return inverted long
    */
-  public static long invert(Long key) {
+  public static long invertLong(long key) {
     return Long.MAX_VALUE - key;
   }
 
+  /**
+   * Converts an int into it's inverse int to be used in (row) keys
+   * where we want to have the largest int value in the top of the table
+   * (scans start at the largest int first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted int
+   */
+  public static int invertInt(int key) {
+    return Integer.MAX_VALUE - key;
+  }
+
+
+  /**
+   * Converts/encodes a string app Id into a byte representation for (row) keys.
+   * For conversion, we extract cluster timestamp and sequence id from the
+   * string app id (calls {@link ConverterUtils#toApplicationId(String)} for
+   * conversion) and then store it in a byte array of length 12 (8 bytes (long)
+   * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
+   * timestamp and sequence id are inverted so that the most recent cluster
+   * timestamp and highest sequence id appears first in the table (i.e.
+   * application id appears in a descending order).
+   *
+   * @param appIdStr application id in string format i.e.
+   * application_{cluster timestamp}_{sequence id with min 4 digits}
+   *
+   * @return encoded byte representation of app id.
+   */
+  public static byte[] encodeAppId(String appIdStr) {
+    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+    byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT];
+    byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp()));
+    System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
+    byte[] seqId = Bytes.toBytes(invertInt(appId.getId()));
+    System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
+    return appIdBytes;
+  }
+
+  /**
+   * Converts/decodes a 12 byte representation of app id for (row) keys to an
+   * app id in string format which can be returned back to client.
+   * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
+   * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
+   * {@link ApplicationId#toString} to generate string representation of app id.
+   *
+   * @param appIdBytes application id in byte representation.
+   *
+   * @return decoded app id in string format.
+   */
+  public static String decodeAppId(byte[] appIdBytes) {
+    if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) {
+      throw new IllegalArgumentException("Invalid app id in byte format");
+    }
+    long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
+    int seqId =
+        invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
+    return ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+
   /**
    * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
    * for a given input timestamp
@@ -325,4 +389,87 @@ public class TimelineWriterUtils {
     return null;
   }
 
-}
+  /**
+   *
+   * @param entityRelations the relations of an entity
+   * @param relationFilters the relations for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchRelations(
+      Map<String, Set<String>> entityRelations,
+      Map<String, Set<String>> relationFilters) {
+    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
+      Set<String> ids = entityRelations.get(relation.getKey());
+      if (ids == null) {
+        return false;
+      }
+      for (String id : relation.getValue()) {
+        if (!ids.contains(id)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param map the map of key/value pairs in an entity
+   * @param filters the map of key/value pairs for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchFilters(Map<String, ? extends Object> map,
+      Map<String, ? extends Object> filters) {
+    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+      Object value = map.get(filter.getKey());
+      if (value == null) {
+        return false;
+      }
+      if (!value.equals(filter.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param entityEvents the set of event objects in an entity
+   * @param eventFilters the set of event Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+      Set<String> eventFilters) {
+    Set<String> eventIds = new HashSet<String>();
+    for (TimelineEvent event : entityEvents) {
+      eventIds.add(event.getId());
+    }
+    for (String eventFilter : eventFilters) {
+      if (!eventIds.contains(eventFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param metrics the set of metric objects in an entity
+   * @param metricFilters the set of metric Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+      Set<String> metricFilters) {
+    Set<String> metricIds = new HashSet<String>();
+    for (TimelineMetric metric : metrics) {
+      metricIds.add(metric.getId());
+    }
+
+    for (String metricFilter : metricFilters) {
+      if (!metricIds.contains(metricFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

+ 16 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the entity table.
@@ -90,9 +90,9 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    return Separator.QUALIFIERS.join(first, second, third, new byte[0]);
   }
 
   /**
@@ -114,10 +114,11 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    byte[] fourth =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, ""));
+    return Separator.QUALIFIERS.join(first, second, third, fourth);
   }
 
   /**
@@ -141,11 +142,11 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
-            entityId));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    byte[] fourth =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId));
+    return Separator.QUALIFIERS.join(first, second, third, fourth);
   }
 
   /**
@@ -166,9 +167,8 @@ public class EntityRowKey {
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
-    String appId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
     String entityType =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
     String entityId =

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 /**
@@ -114,7 +114,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
@@ -235,7 +235,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
         combinedAttributes);

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the flow activity table.
@@ -71,7 +71,7 @@ public class FlowActivityRowKey {
    */
   public static byte[] getRowKey(String clusterId, String userId,
       String flowId) {
-    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
         .currentTimeMillis());
     return getRowKey(clusterId, dayTs, userId, flowId);
   }
@@ -90,7 +90,7 @@ public class FlowActivityRowKey {
       String flowId) {
     return Separator.QUALIFIERS.join(
         Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
-        Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+        Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
   }
@@ -108,7 +108,8 @@ public class FlowActivityRowKey {
 
     String clusterId = Separator.QUALIFIERS.decode(Bytes
         .toString(rowKeyComponents[0]));
-    long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+    long dayTs =
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
     String userId = Separator.QUALIFIERS.decode(Bytes
         .toString(rowKeyComponents[2]));
     String flowId = Separator.QUALIFIERS.decode(Bytes

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 /**
@@ -97,7 +97,7 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
       TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
       Object inputValue, Attribute... attributes) throws IOException {
 
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, aggOp);
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
         inputValue, combinedAttributes);

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 /**
@@ -112,7 +112,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
@@ -140,7 +140,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java

@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 
 public class FlowRunCoprocessor extends BaseRegionObserver {
@@ -89,7 +89,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
     List<Tag> tags = new ArrayList<>();
     if ((attributes != null) && (attributes.size() > 0)) {
       for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
-        Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
+        Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
         tags.add(t);
       }
       byte[] tagByteArray = Tag.fromList(tags);

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the flow run table.
@@ -70,7 +70,7 @@ public class FlowRunRowKey {
         userId, flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
     return Separator.QUALIFIERS.join(first, second);
   }
 
@@ -92,7 +92,7 @@ public class FlowRunRowKey {
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
     return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
   }
 }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java

@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -136,7 +136,7 @@ class FlowScanner implements RegionScanner, Closeable {
     // So all cells in one qualifier come one after the other before we see the
     // next column qualifier
     ByteArrayComparator comp = new ByteArrayComparator();
-    byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+    byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES;
     AggregationOperation currentAggOp = null;
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     Set<String> alreadySeenAggDim = new HashSet<>();
@@ -163,7 +163,7 @@ class FlowScanner implements RegionScanner, Closeable {
     List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
         cell.getTagsLength());
     // We assume that all the operations for a particular column are the same
-    return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+    return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
   }
 
   /**

+ 8 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java

@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -366,7 +366,8 @@ public class TestHBaseTimelineStorage {
       String flow = "some_flow_name";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
-      String appName = "some app name";
+      String appName =
+          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       hbi.stop();
 
@@ -592,7 +593,8 @@ public class TestHBaseTimelineStorage {
         byte[][] karr = (byte[][])e.getKey();
         assertEquals(3, karr.length);
         assertEquals(eventId, Bytes.toString(karr[0]));
-        assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
+        assertEquals(
+            TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1]));
         assertEquals(expKey, Bytes.toString(karr[2]));
         Object value = e.getValue();
         // there should be only one timestamp and value
@@ -667,7 +669,8 @@ public class TestHBaseTimelineStorage {
       String flow = "other_flow_name";
       String flowVersion = "1111F01C2287BA";
       long runid = 1009876543218L;
-      String appName = "some app name";
+      String appName =
+          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
       byte[] startRow =
           EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
@@ -700,7 +703,7 @@ public class TestHBaseTimelineStorage {
             byte[][] karr = (byte[][])e.getKey();
             assertEquals(3, karr.length);
             assertEquals(eventId, Bytes.toString(karr[0]));
-            assertEquals(TimelineWriterUtils.invert(expTs),
+            assertEquals(TimelineStorageUtils.invertLong(expTs),
                 Bytes.toLong(karr[1]));
             // key must be empty
             assertEquals(0, karr[2].length);

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Test;
+
+public class TestTimelineStorageUtils {
+
+  @Test
+  public void testEncodeDecodeAppId() {
+    long currentTs = System.currentTimeMillis();
+    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
+    String appIdStr1 = appId1.toString();
+    String appIdStr2 = appId2.toString();
+    String appIdStr3 = appId3.toString();
+    byte[] appIdBytes1 = TimelineStorageUtils.encodeAppId(appIdStr1);
+    byte[] appIdBytes2 = TimelineStorageUtils.encodeAppId(appIdStr2);
+    byte[] appIdBytes3 = TimelineStorageUtils.encodeAppId(appIdStr3);
+    // App ids' should be encoded in a manner wherein descending order
+    // is maintained.
+    assertTrue("Ordering of app ids' is incorrect",
+        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
+        Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
+        Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+    String decodedAppId1 = TimelineStorageUtils.decodeAppId(appIdBytes1);
+    String decodedAppId2 = TimelineStorageUtils.decodeAppId(appIdBytes2);
+    String decodedAppId3 = TimelineStorageUtils.decodeAppId(appIdBytes3);
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr1.equals(decodedAppId1));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr2.equals(decodedAppId2));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr3.equals(decodedAppId3));
+  }
+}

+ 0 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java

@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.junit.Test;
-
-public class TestTimelineWriterUtils {
-
-  @Test
-  public void test() {
-    // TODO: implement a test for the remaining method in TimelineWriterUtils.
-  }
-
-}

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java

@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReader
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -166,7 +166,7 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(cluster, flowActivityRowKey.getClusterId());
     assertEquals(user, flowActivityRowKey.getUserId());
     assertEquals(flow, flowActivityRowKey.getFlowId());
-    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
         .currentTimeMillis());
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(1, values.size());
@@ -281,7 +281,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowId());
-      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
           .currentTimeMillis());
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
       assertEquals(1, values.size());
@@ -360,7 +360,7 @@ public class TestHBaseStorageFlowActivity {
         assertEquals(cluster, flowActivity.getCluster());
         assertEquals(user, flowActivity.getUser());
         assertEquals(flow, flowActivity.getFlowName());
-        long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+        long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
             .currentTimeMillis());
         assertEquals(dayTs, flowActivity.getDate().getTime());
         Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
@@ -410,7 +410,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowId());
-      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
           .currentTimeMillis());
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());