Explorar el Código

YARN-4025. Deal with byte representations of Longs in writer code. Contributed by Sangjin Lee and Vrushali C.

Junping Du hace 10 años
padre
commit
233bfc9690
Se han modificado 11 ficheros con 373 adiciones y 141 borrados
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 36 32
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
  3. 11 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
  4. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
  5. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
  6. 92 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
  7. 13 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
  8. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
  9. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
  10. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
  11. 126 81
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -94,6 +94,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3045. Implement NM writing container lifecycle events to Timeline
     Service v2. (Naganarasimha G R via junping_du)
 
+    YARN-4025. Deal with byte representations of Longs in writer code.
+    (Sangjin Lee and Vrushali C via junping_du)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

+ 36 - 32
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java

@@ -19,12 +19,9 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
@@ -431,44 +428,51 @@ public class HBaseTimelineReaderImpl
     Map<String, Object> columns = prefix.readResults(result);
     if (isConfig) {
       for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getKey().toString());
+        entity.addConfig(column.getKey(), column.getValue().toString());
       }
     } else {
       entity.addInfo(columns);
     }
   }
 
+  /**
+   * Read events from the entity table or the application table. The column name
+   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+   * if there is no info associated with the event.
+   *
+   * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
+   * schema description.
+   */
   private static void readEvents(TimelineEntity entity, Result result,
       boolean isApplication) throws IOException {
     Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<String, Object> eventsResult = isApplication ?
-        ApplicationColumnPrefix.EVENT.readResults(result) :
-        EntityColumnPrefix.EVENT.readResults(result);
-    for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
-      Collection<String> tokens =
-          Separator.VALUES.splitEncoded(eventResult.getKey());
-      if (tokens.size() != 2 && tokens.size() != 3) {
-        throw new IOException(
-            "Invalid event column name: " + eventResult.getKey());
-      }
-      Iterator<String> idItr = tokens.iterator();
-      String id = idItr.next();
-      String tsStr = idItr.next();
-      // TODO: timestamp is not correct via ser/des through UTF-8 string
-      Long ts =
-          TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes(
-              StandardCharsets.UTF_8)));
-      String key = Separator.VALUES.joinEncoded(id, ts.toString());
-      TimelineEvent event = eventsMap.get(key);
-      if (event == null) {
-        event = new TimelineEvent();
-        event.setId(id);
-        event.setTimestamp(ts);
-        eventsMap.put(key, event);
-      }
-      if (tokens.size() == 3) {
-        String infoKey = idItr.next();
-        event.addInfo(infoKey, eventResult.getValue());
+    Map<?, Object> eventsResult = isApplication ?
+        ApplicationColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result) :
+        EntityColumnPrefix.EVENT.
+            readResultsHavingCompoundColumnQualifiers(result);
+    for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+      byte[][] karr = (byte[][])eventResult.getKey();
+      // the column name is of the form "eventId=timestamp=infoKey"
+      if (karr.length == 3) {
+        String id = Bytes.toString(karr[0]);
+        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+        String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+        TimelineEvent event = eventsMap.get(key);
+        if (event == null) {
+          event = new TimelineEvent();
+          event.setId(id);
+          event.setTimestamp(ts);
+          eventsMap.put(key, event);
+        }
+        // handle empty info
+        String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+        if (infoKey != null) {
+          event.addInfo(infoKey, eventResult.getValue());
+        }
+      } else {
+        LOG.warn("incorrectly formatted column name: it will be discarded");
+        continue;
       }
     }
     Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());

+ 11 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java

@@ -300,25 +300,27 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
               byte[] compoundColumnQualifierBytes =
                   Separator.VALUES.join(columnQualifierWithTsBytes,
                       null);
-              String compoundColumnQualifier =
-                  Bytes.toString(compoundColumnQualifierBytes);
-              EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                  compoundColumnQualifier, null, TimelineWriterUtils.EMPTY_BYTES);
+              if (isApplication) {
+                ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
+                    compoundColumnQualifierBytes, null,
+                      TimelineWriterUtils.EMPTY_BYTES);
+              } else {
+                EntityColumnPrefix.EVENT.store(rowKey, entityTable,
+                    compoundColumnQualifierBytes, null,
+                    TimelineWriterUtils.EMPTY_BYTES);
+              }
             } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
                 // eventId?infoKey
                 byte[] compoundColumnQualifierBytes =
                     Separator.VALUES.join(columnQualifierWithTsBytes,
                         Bytes.toBytes(info.getKey()));
-                // convert back to string to avoid additional API on store.
-                String compoundColumnQualifier =
-                    Bytes.toString(compoundColumnQualifierBytes);
                 if (isApplication) {
                   ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
-                    compoundColumnQualifier, null, info.getValue());
+                    compoundColumnQualifierBytes, null, info.getValue());
                 } else {
                   EntityColumnPrefix.EVENT.store(rowKey, entityTable,
-                    compoundColumnQualifier, null, info.getValue());
+                    compoundColumnQualifierBytes, null, info.getValue());
                 }
               } // for info: eventInfo
             }

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java

@@ -101,6 +101,31 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
     return columnPrefix;
   }
 
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue) throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -150,6 +175,21 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
     return column.readResults(result, columnPrefixBytes);
   }
 
+  /**
+   * @param result from which to read columns
+   * @return the latest values of columns in the column family. The column
+   *         qualifier is returned as a list of parts, each part a byte[]. This
+   *         is to facilitate returning byte arrays of values that were not
+   *         Strings. If they can be treated as Strings, you should use
+   *         {@link #readResults(Result)} instead.
+   * @throws IOException
+   */
+  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+      throws IOException {
+    return column.readResultsHavingCompoundColumnQualifiers(result,
+        columnPrefixBytes);
+  }
+
   /*
    * (non-Javadoc)
    *

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java

@@ -57,12 +57,12 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
  * |            | infoValue                    | metricValue1 |              |
  * |            |                              | @timestamp2  |              |
  * |            | r!relatesToKey:              |              |              |
- * |            | id3?id4?id5                  |              |              |
+ * |            | id3=id4=id5                  |              |              |
  * |            |                              |              |              |
  * |            | s!isRelatedToKey:            |              |              |
- * |            | id7?id9?id6                  |              |              |
+ * |            | id7=id9=id6                  |              |              |
  * |            |                              |              |              |
- * |            | e!eventId?timestamp?infoKey: |              |              |
+ * |            | e!eventId=timestamp=infoKey: |              |              |
  * |            | eventInfoValue               |              |              |
  * |            |                              |              |              |
  * |            | flowVersion:                 |              |              |

+ 92 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java

@@ -24,6 +24,8 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
  * @param <T> refers to the table.
  */
 public class ColumnHelper<T> {
+  private static final Log LOG = LogFactory.getLog(ColumnHelper.class);
 
   private final ColumnFamily<T> columnFamily;
 
@@ -143,6 +146,7 @@ public class ColumnHelper<T> {
             .entrySet()) {
           String columnName = null;
           if (columnPrefixBytes == null) {
+            LOG.info("null prefix was specified; returning all columns");
             // Decode the spaces we encoded in the column name.
             columnName = Separator.decode(entry.getKey(), Separator.SPACE);
           } else {
@@ -181,32 +185,43 @@ public class ColumnHelper<T> {
   /**
    * @param result from which to read columns
    * @param columnPrefixBytes optional prefix to limit columns. If null all
-   *          columns are returned.
-   * @return the latest values of columns in the column family.
+   *        columns are returned.
+   * @return the latest values of columns in the column family. This assumes
+   *         that the column name parts are all Strings by default. If the
+   *         column name parts should be treated natively and not be converted
+   *         back and forth from Strings, you should use
+   *         {@link #readResultsHavingCompoundColumnQualifiers(Result, byte[])}
+   *         instead.
    * @throws IOException
    */
-  public Map<String, Object> readResults(Result result, byte[] columnPrefixBytes)
-      throws IOException {
+  public Map<String, Object> readResults(Result result,
+      byte[] columnPrefixBytes) throws IOException {
     Map<String, Object> results = new HashMap<String, Object>();
 
     if (result != null) {
       Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
       for (Entry<byte[], byte[]> entry : columns.entrySet()) {
-        if (entry.getKey() != null && entry.getKey().length > 0) {
+        byte[] columnKey = entry.getKey();
+        if (columnKey != null && columnKey.length > 0) {
 
           String columnName = null;
           if (columnPrefixBytes == null) {
+            LOG.info("null prefix was specified; returning all columns");
             // Decode the spaces we encoded in the column name.
-            columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+            columnName = Separator.decode(columnKey, Separator.SPACE);
           } else {
             // A non-null prefix means columns are actually of the form
             // prefix!columnNameRemainder
             byte[][] columnNameParts =
-                Separator.QUALIFIERS.split(entry.getKey(), 2);
+                Separator.QUALIFIERS.split(columnKey, 2);
             byte[] actualColumnPrefixBytes = columnNameParts[0];
             if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
                 && columnNameParts.length == 2) {
               // This is the prefix that we want
+              // if the column name is a compound qualifier
+              // with non string datatypes, the following decode will not
+              // work correctly since it considers all components to be String
+              // invoke the readResultsHavingCompoundColumnQualifiers function
               columnName = Separator.decode(columnNameParts[1]);
             }
           }
@@ -222,6 +237,56 @@ public class ColumnHelper<T> {
     return results;
   }
 
+  /**
+   * @param result from which to read columns
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *        columns are returned.
+   * @return the latest values of columns in the column family. If the column
+   *         prefix is null, the column qualifier is returned as Strings. For a
+   *         non-null column prefix bytes, the column qualifier is returned as
+   *         a list of parts, each part a byte[]. This is to facilitate
+   *         returning byte arrays of values that were not Strings.
+   * @throws IOException
+   */
+  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result,
+      byte[] columnPrefixBytes) throws IOException {
+    // handle the case where the column prefix is null
+    // it is the same as readResults() so simply delegate to that implementation
+    if (columnPrefixBytes == null) {
+      return readResults(result, null);
+    }
+
+    Map<byte[][], Object> results = new HashMap<byte[][], Object>();
+
+    if (result != null) {
+      Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+      for (Entry<byte[], byte[]> entry : columns.entrySet()) {
+        byte[] columnKey = entry.getKey();
+        if (columnKey != null && columnKey.length > 0) {
+          // A non-null prefix means columns are actually of the form
+          // prefix!columnNameRemainder
+          // with a compound column qualifier, we are presuming existence of a
+          // prefix
+          byte[][] columnNameParts = Separator.QUALIFIERS.split(columnKey, 2);
+          if (columnNameParts.length > 0) {
+            byte[] actualColumnPrefixBytes = columnNameParts[0];
+            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                && columnNameParts.length == 2) {
+              // This is the prefix that we want
+              byte[][] columnQualifierParts =
+                  Separator.VALUES.split(columnNameParts[1]);
+              Object value = GenericObjectMapper.read(entry.getValue());
+              // we return the columnQualifier in parts since we don't know
+              // which part is of which data type
+              results.put(columnQualifierParts, value);
+            }
+          }
+        }
+      } // for entry
+    }
+    return results;
+  }
+
   /**
    * @param columnPrefixBytes The byte representation for the column prefix.
    *          Should not contain {@link Separator#QUALIFIERS}.
@@ -247,4 +312,24 @@ public class ColumnHelper<T> {
     return columnQualifier;
   }
 
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier the byte representation for the remainder of the column.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      byte[] qualifier) {
+
+    if (columnPrefixBytes == null) {
+      return qualifier;
+    }
+
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, qualifier);
+    return columnQualifier;
+  }
+
 }

+ 13 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java

@@ -37,7 +37,7 @@ public enum Separator {
   /**
    * separator in values, and/or compound key/column qualifier fields.
    */
-  VALUES("?", "%1$"),
+  VALUES("=", "%1$"),
 
   /**
    * separator in values, often used to avoid having these in qualifiers and
@@ -299,12 +299,22 @@ public enum Separator {
    * up to a maximum of count items. This will naturally produce copied byte
    * arrays for each of the split segments.
    * @param source to be split
-   * @param limit on how many segments are supposed to be returned. Negative
-   *          value indicates no limit on number of segments.
+   * @param limit on how many segments are supposed to be returned. A
+   *          non-positive value indicates no limit on number of segments.
    * @return source split by this separator.
    */
   public byte[][] split(byte[] source, int limit) {
     return TimelineWriterUtils.split(source, this.bytes, limit);
   }
 
+  /**
+   * Splits the source array into multiple array segments using this separator,
+   * as many times as splits are found. This will naturally produce copied byte
+   * arrays for each of the split segments.
+   * @param source to be split
+   * @return source split by this separator.
+   */
+  public byte[][] split(byte[] source) {
+    return TimelineWriterUtils.split(source, this.bytes);
+  }
 }

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java

@@ -33,6 +33,9 @@ public class TimelineWriterUtils {
   /** empty bytes */
   public static final byte[] EMPTY_BYTES = new byte[0];
 
+  /** indicator for no limits for splitting */
+  public static final int NO_LIMIT_SPLIT = -1;
+
   /**
    * Splits the source array into multiple array segments using the given
    * separator, up to a maximum of count items. This will naturally produce
@@ -45,7 +48,7 @@ public class TimelineWriterUtils {
    * @return byte[] array after splitting the source
    */
   public static byte[][] split(byte[] source, byte[] separator) {
-    return split(source, separator, -1);
+    return split(source, separator, NO_LIMIT_SPLIT);
   }
 
   /**
@@ -57,7 +60,7 @@ public class TimelineWriterUtils {
    *
    * @param source
    * @param separator
-   * @param limit a negative value indicates no limit on number of segments.
+   * @param limit a non-positive value indicates no limit on number of segments.
    * @return byte[][] after splitting the input source
    */
   public static byte[][] split(byte[] source, byte[] separator, int limit) {
@@ -81,7 +84,7 @@ public class TimelineWriterUtils {
    * separator byte array.
    */
   public static List<Range> splitRanges(byte[] source, byte[] separator) {
-    return splitRanges(source, separator, -1);
+    return splitRanges(source, separator, NO_LIMIT_SPLIT);
   }
 
   /**

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java

@@ -126,6 +126,31 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
   }
 
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue) throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -150,6 +175,21 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     return column.readResults(result, columnPrefixBytes);
   }
 
+  /**
+   * @param result from which to read columns
+   * @return the latest values of columns in the column family. The column
+   *         qualifier is returned as a list of parts, each part a byte[]. This
+   *         is to facilitate returning byte arrays of values that were not
+   *         Strings. If they can be treated as Strings, you should use
+   *         {@link #readResults(Result)} instead.
+   * @throws IOException
+   */
+  public Map<?, Object> readResultsHavingCompoundColumnQualifiers(Result result)
+          throws IOException {
+    return column.readResultsHavingCompoundColumnQualifiers(result,
+        columnPrefixBytes);
+  }
+
   /*
    * (non-Javadoc)
    *

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java

@@ -58,12 +58,12 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBas
  * |            | infoValue                    |              |              |
  * |            |                              |              |              |
  * |            | r!relatesToKey:              |              |              |
- * |            | id3?id4?id5                  |              |              |
+ * |            | id3=id4=id5                  |              |              |
  * |            |                              |              |              |
  * |            | s!isRelatedToKey             |              |              |
- * |            | id7?id9?id6                  |              |              |
+ * |            | id7=id9=id6                  |              |              |
  * |            |                              |              |              |
- * |            | e!eventId?timestamp?infoKey: |              |              |
+ * |            | e!eventId=timestamp=infoKey: |              |              |
  * |            | eventInfoValue               |              |              |
  * |            |                              |              |              |
  * |            | flowVersion:                 |              |              |

+ 126 - 81
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

@@ -27,8 +27,8 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
@@ -75,7 +76,7 @@ import org.junit.Test;
  * even if other records exist in the table. Use a different cluster name if
  * you add a new test.
  */
-public class TestHBaseTimelineWriterImpl {
+public class TestHBaseTimelineStorage {
 
   private static HBaseTestingUtility util;
 
@@ -101,8 +102,8 @@ public class TestHBaseTimelineWriterImpl {
     ApplicationEntity entity = new ApplicationEntity();
     String id = "hello";
     entity.setId(id);
-    Long cTime = 1425016501000L;
-    Long mTime = 1425026901000L;
+    long cTime = 1425016501000L;
+    long mTime = 1425026901000L;
     entity.setCreatedTime(cTime);
     entity.setModifiedTime(mTime);
 
@@ -197,19 +198,16 @@ public class TestHBaseTimelineWriterImpl {
 
       Number val =
           (Number) ApplicationColumn.CREATED_TIME.readResult(result);
-      Long cTime1 = val.longValue();
+      long cTime1 = val.longValue();
       assertEquals(cTime1, cTime);
 
       val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result);
-      Long mTime1 = val.longValue();
+      long mTime1 = val.longValue();
       assertEquals(mTime1, mTime);
 
       Map<String, Object> infoColumns =
           ApplicationColumnPrefix.INFO.readResults(result);
-      assertEquals(infoMap.size(), infoColumns.size());
-      for (String infoItem : infoMap.keySet()) {
-        assertEquals(infoMap.get(infoItem), infoColumns.get(infoItem));
-      }
+      assertEquals(infoMap, infoColumns);
 
       // Remember isRelatedTo is of type Map<String, Set<String>>
       for (String isRelatedToKey : isRelatedTo.keySet()) {
@@ -245,27 +243,15 @@ public class TestHBaseTimelineWriterImpl {
       // Configuration
       Map<String, Object> configColumns =
           ApplicationColumnPrefix.CONFIG.readResults(result);
-      assertEquals(conf.size(), configColumns.size());
-      for (String configItem : conf.keySet()) {
-        assertEquals(conf.get(configItem), configColumns.get(configItem));
-      }
+      assertEquals(conf, configColumns);
 
       NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
           ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
 
       NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
-      // We got metrics back
-      assertNotNull(metricMap);
-      // Same number of metrics as we wrote
-      assertEquals(metricValues.entrySet().size(), metricMap.entrySet().size());
-
-      // Iterate over original metrics and confirm that they are present
-      // here.
-      for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
-        assertEquals(metricEntry.getValue(),
-            metricMap.get(metricEntry.getKey()));
-      }
+      assertEquals(metricValues, metricMap);
 
+      // read the timeline entity using the reader this time
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
           entity.getType(), entity.getId(),
           EnumSet.of(TimelineReader.Field.ALL));
@@ -274,6 +260,31 @@ public class TestHBaseTimelineWriterImpl {
           null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
       assertEquals(1, es1.size());
+
+      // verify attributes
+      assertEquals(id, e1.getId());
+      assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
+          e1.getType());
+      assertEquals(cTime, e1.getCreatedTime());
+      assertEquals(mTime, e1.getModifiedTime());
+      Map<String, Object> infoMap2 = e1.getInfo();
+      assertEquals(infoMap, infoMap2);
+
+      Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
+      assertEquals(isRelatedTo, isRelatedTo2);
+
+      Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
+      assertEquals(relatesTo, relatesTo2);
+
+      Map<String, String> conf2 = e1.getConfigs();
+      assertEquals(conf, conf2);
+
+      Set<TimelineMetric> metrics2 = e1.getMetrics();
+      assertEquals(metrics, metrics2);
+      for (TimelineMetric metric2 : metrics2) {
+        Map<Long, Number> metricValues2 = metric2.getValues();
+        assertEquals(metricValues, metricValues2);
+      }
     } finally {
       if (hbi != null) {
         hbi.stop();
@@ -294,8 +305,8 @@ public class TestHBaseTimelineWriterImpl {
     String type = "world";
     entity.setId(id);
     entity.setType(type);
-    Long cTime = 1425016501000L;
-    Long mTime = 1425026901000L;
+    long cTime = 1425016501000L;
+    long mTime = 1425026901000L;
     entity.setCreatedTime(cTime);
     entity.setModifiedTime(mTime);
 
@@ -396,20 +407,16 @@ public class TestHBaseTimelineWriterImpl {
           assertEquals(type, type1);
 
           Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
-          Long cTime1 = val.longValue();
+          long cTime1 = val.longValue();
           assertEquals(cTime1, cTime);
 
           val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
-          Long mTime1 = val.longValue();
+          long mTime1 = val.longValue();
           assertEquals(mTime1, mTime);
 
           Map<String, Object> infoColumns =
               EntityColumnPrefix.INFO.readResults(result);
-          assertEquals(infoMap.size(), infoColumns.size());
-          for (String infoItem : infoMap.keySet()) {
-            assertEquals(infoMap.get(infoItem),
-                infoColumns.get(infoItem));
-          }
+          assertEquals(infoMap, infoColumns);
 
           // Remember isRelatedTo is of type Map<String, Set<String>>
           for (String isRelatedToKey : isRelatedTo.keySet()) {
@@ -447,32 +454,19 @@ public class TestHBaseTimelineWriterImpl {
           // Configuration
           Map<String, Object> configColumns =
               EntityColumnPrefix.CONFIG.readResults(result);
-          assertEquals(conf.size(), configColumns.size());
-          for (String configItem : conf.keySet()) {
-            assertEquals(conf.get(configItem), configColumns.get(configItem));
-          }
+          assertEquals(conf, configColumns);
 
           NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
               EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
 
           NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
-          // We got metrics back
-          assertNotNull(metricMap);
-          // Same number of metrics as we wrote
-          assertEquals(metricValues.entrySet().size(), metricMap.entrySet()
-              .size());
-
-          // Iterate over original metrics and confirm that they are present
-          // here.
-          for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
-            assertEquals(metricEntry.getValue(),
-                metricMap.get(metricEntry.getKey()));
-          }
+          assertEquals(metricValues, metricMap);
         }
       }
       assertEquals(1, rowCount);
       assertEquals(17, colCount);
 
+      // read the timeline entity using the reader this time
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
           entity.getType(), entity.getId(),
           EnumSet.of(TimelineReader.Field.ALL));
@@ -481,6 +475,30 @@ public class TestHBaseTimelineWriterImpl {
           null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
       assertEquals(1, es1.size());
+
+      // verify attributes
+      assertEquals(id, e1.getId());
+      assertEquals(type, e1.getType());
+      assertEquals(cTime, e1.getCreatedTime());
+      assertEquals(mTime, e1.getModifiedTime());
+      Map<String, Object> infoMap2 = e1.getInfo();
+      assertEquals(infoMap, infoMap2);
+
+      Map<String, Set<String>> isRelatedTo2 = e1.getIsRelatedToEntities();
+      assertEquals(isRelatedTo, isRelatedTo2);
+
+      Map<String, Set<String>> relatesTo2 = e1.getRelatesToEntities();
+      assertEquals(relatesTo, relatesTo2);
+
+      Map<String, String> conf2 = e1.getConfigs();
+      assertEquals(conf, conf2);
+
+      Set<TimelineMetric> metrics2 = e1.getMetrics();
+      assertEquals(metrics, metrics2);
+      for (TimelineMetric metric2 : metrics2) {
+        Map<Long, Number> metricValues2 = metric2.getValues();
+        assertEquals(metricValues, metricValues2);
+      }
     } finally {
       if (hbi != null) {
         hbi.stop();
@@ -494,9 +512,9 @@ public class TestHBaseTimelineWriterImpl {
   }
 
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
-      String flow, Long runid, String appName, TimelineEntity te) {
+      String flow, long runid, String appName, TimelineEntity te) {
 
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
 
     assertTrue(rowKeyComponents.length == 7);
     assertEquals(user, Bytes.toString(rowKeyComponents[0]));
@@ -511,9 +529,9 @@ public class TestHBaseTimelineWriterImpl {
   }
 
   private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
-      String user, String flow, Long runid, String appName) {
+      String user, String flow, long runid, String appName) {
 
-    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
 
     assertTrue(rowKeyComponents.length == 5);
     assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
@@ -530,7 +548,7 @@ public class TestHBaseTimelineWriterImpl {
     TimelineEvent event = new TimelineEvent();
     String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
     event.setId(eventId);
-    Long expTs = 1436512802000L;
+    long expTs = 1436512802000L;
     event.setTimestamp(expTs);
     String expKey = "foo_event";
     Object expVal = "test";
@@ -577,24 +595,25 @@ public class TestHBaseTimelineWriterImpl {
       assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
           appName));
 
-      Map<String, Object> eventsResult =
-          ApplicationColumnPrefix.EVENT.readResults(result);
+      Map<?, Object> eventsResult =
+          ApplicationColumnPrefix.EVENT.
+              readResultsHavingCompoundColumnQualifiers(result);
       // there should be only one event
       assertEquals(1, eventsResult.size());
-      // key name for the event
-      byte[] compoundColumnQualifierBytes =
-          Separator.VALUES.join(Bytes.toBytes(eventId),
-              Bytes.toBytes(TimelineWriterUtils.invert(expTs)),
-              Bytes.toBytes(expKey));
-      String valueKey = Bytes.toString(compoundColumnQualifierBytes);
-      for (Map.Entry<String, Object> e : eventsResult.entrySet()) {
-        // the value key must match
-        assertEquals(valueKey, e.getKey());
+      for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+        // the qualifier is a compound key
+        // hence match individual values
+        byte[][] karr = (byte[][])e.getKey();
+        assertEquals(3, karr.length);
+        assertEquals(eventId, Bytes.toString(karr[0]));
+        assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
+        assertEquals(expKey, Bytes.toString(karr[2]));
         Object value = e.getValue();
         // there should be only one timestamp and value
         assertEquals(expVal, value.toString());
       }
 
+      // read the timeline entity using the reader this time
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
           entity.getType(), entity.getId(),
           EnumSet.of(TimelineReader.Field.ALL));
@@ -613,6 +632,21 @@ public class TestHBaseTimelineWriterImpl {
       assertEquals(1, es1.size());
       assertEquals(1, es2.size());
       assertEquals(es1, es2);
+
+      // check the events
+      NavigableSet<TimelineEvent> events = e1.getEvents();
+      // there should be only one event
+      assertEquals(1, events.size());
+      for (TimelineEvent e : events) {
+        assertEquals(eventId, e.getId());
+        assertEquals(expTs, e.getTimestamp());
+        Map<String,Object> info = e.getInfo();
+        assertEquals(1, info.size());
+        for (Map.Entry<String, Object> infoEntry : info.entrySet()) {
+          assertEquals(expKey, infoEntry.getKey());
+          assertEquals(expVal, infoEntry.getValue());
+        }
+      }
     } finally {
       if (hbi != null) {
         hbi.stop();
@@ -630,7 +664,7 @@ public class TestHBaseTimelineWriterImpl {
     TimelineEvent event = new TimelineEvent();
     String eventId = "foo_event_id";
     event.setId(eventId);
-    Long expTs = 1436512802000L;
+    long expTs = 1436512802000L;
     event.setTimestamp(expTs);
 
     final TimelineEntity entity = new TimelineEntity();
@@ -678,22 +712,21 @@ public class TestHBaseTimelineWriterImpl {
           assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName,
               entity));
 
-          Map<String, Object> eventsResult =
-              EntityColumnPrefix.EVENT.readResults(result);
+          Map<?, Object> eventsResult =
+              EntityColumnPrefix.EVENT.
+                  readResultsHavingCompoundColumnQualifiers(result);
           // there should be only one event
           assertEquals(1, eventsResult.size());
-          // key name for the event
-          byte[] compoundColumnQualifierWithTsBytes =
-              Separator.VALUES.join(Bytes.toBytes(eventId),
-                  Bytes.toBytes(TimelineWriterUtils.invert(expTs)));
-          byte[] compoundColumnQualifierBytes =
-              Separator.VALUES.join(compoundColumnQualifierWithTsBytes,
-                  null);
-          String valueKey = Bytes.toString(compoundColumnQualifierBytes);
-          for (Map.Entry<String, Object> e :
-              eventsResult.entrySet()) {
-            // the column qualifier key must match
-            assertEquals(valueKey, e.getKey());
+          for (Map.Entry<?, Object> e : eventsResult.entrySet()) {
+            // the qualifier is a compound key
+            // hence match individual values
+            byte[][] karr = (byte[][])e.getKey();
+            assertEquals(3, karr.length);
+            assertEquals(eventId, Bytes.toString(karr[0]));
+            assertEquals(TimelineWriterUtils.invert(expTs),
+                Bytes.toLong(karr[1]));
+            // key must be empty
+            assertEquals(0, karr[2].length);
             Object value = e.getValue();
             // value should be empty
             assertEquals("", value.toString());
@@ -702,6 +735,7 @@ public class TestHBaseTimelineWriterImpl {
       }
       assertEquals(1, rowCount);
 
+      // read the timeline entity using the reader this time
       TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
           entity.getType(), entity.getId(),
           EnumSet.of(TimelineReader.Field.ALL));
@@ -710,6 +744,17 @@ public class TestHBaseTimelineWriterImpl {
           null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
       assertEquals(1, es1.size());
+
+      // check the events
+      NavigableSet<TimelineEvent> events = e1.getEvents();
+      // there should be only one event
+      assertEquals(1, events.size());
+      for (TimelineEvent e : events) {
+        assertEquals(eventId, e.getId());
+        assertEquals(expTs, e.getTimestamp());
+        Map<String,Object> info = e.getInfo();
+        assertTrue(info == null || info.isEmpty());
+      }
     } finally {
       hbi.stop();
       hbi.close();