Browse Source

YARN-3706. Generalize native HBase writer for additional tables (Joep Rottinghuis via sjlee)

(cherry picked from commit 9137aeae0dec83f9eff40d12cae712dfd508c0c5)
Sangjin Lee 10 years ago
parent
commit
aba1fe7f02
29 changed files with 2243 additions and 904 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 0 110
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
  3. 0 95
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
  4. 62 52
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
  5. 0 71
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
  6. 17 117
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
  7. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
  8. 0 344
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
  9. 118 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
  10. 73 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
  11. 59 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
  12. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
  13. 247 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
  14. 83 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
  15. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
  16. 303 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
  17. 68 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
  18. 127 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
  19. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java
  20. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
  21. 141 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
  22. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
  23. 212 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
  24. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
  25. 161 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
  26. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java
  27. 140 112
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
  28. 129 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
  29. 29 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -93,6 +93,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via
     zjshen)
 
+    YARN-3706. Generalize native HBase writer for additional tables (Joep
+    Rottinghuis via sjlee)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 0 - 110
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java

@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Contains the Info Column Family details like Column names, types and byte
- * representations for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * object that is stored in hbase Also has utility functions for storing each of
- * these to the backend
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-enum EntityColumnDetails {
-  ID(EntityColumnFamily.INFO, "id"),
-  TYPE(EntityColumnFamily.INFO, "type"),
-  CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
-  MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
-  FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"),
-  PREFIX_IS_RELATED_TO(EntityColumnFamily.INFO, "r"),
-  PREFIX_RELATES_TO(EntityColumnFamily.INFO, "s"),
-  PREFIX_EVENTS(EntityColumnFamily.INFO, "e");
-
-  private final EntityColumnFamily columnFamily;
-  private final String value;
-  private final byte[] inBytes;
-
-  private EntityColumnDetails(EntityColumnFamily columnFamily, 
-      String value) {
-    this.columnFamily = columnFamily;
-    this.value = value;
-    this.inBytes = Bytes.toBytes(this.value.toLowerCase());
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  byte[] getInBytes() {
-    return inBytes;
-  }
-
-  void store(byte[] rowKey, BufferedMutator entityTable, Object inputValue)
-      throws IOException {
-    TimelineWriterUtils.store(rowKey, entityTable,
-        this.columnFamily.getInBytes(), null, this.getInBytes(), inputValue,
-        null);
-  }
-
-  /**
-   * stores events data with column prefix
-   */
-  void store(byte[] rowKey, BufferedMutator entityTable, byte[] idBytes,
-      String key, Object inputValue) throws IOException {
-    TimelineWriterUtils.store(rowKey, entityTable,
-        this.columnFamily.getInBytes(),
-        // column prefix
-        TimelineWriterUtils.join(
-            TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
-            this.getInBytes(), idBytes),
-        // column qualifier
-        Bytes.toBytes(key),
-        inputValue, null);
-  }
-
-  /**
-   * stores relation entities with a column prefix
-   */
-  void store(byte[] rowKey, BufferedMutator entityTable, String key,
-      Set<String> inputValue) throws IOException {
-    TimelineWriterUtils.store(rowKey, entityTable,
-        this.columnFamily.getInBytes(),
-        // column prefix
-        this.getInBytes(),
-        // column qualifier
-        Bytes.toBytes(key),
-        // value
-        TimelineWriterUtils.getValueAsString(
-            TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, inputValue),
-        // cell timestamp
-        null);
-  }
-
-  // TODO add a method that accepts a byte array,
-  // iterates over the enum and returns an enum from those bytes
-
-}

+ 0 - 95
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java

@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Contains the Column family names and byte representations for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * object that is stored in hbase
- * Also has utility functions for storing each of these to the backend
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-enum EntityColumnFamily {
-  INFO("i"),
-  CONFIG("c"),
-  METRICS("m");
-
-  private final String value;
-  private final byte[] inBytes;
-
-  private EntityColumnFamily(String value) {
-    this.value = value;
-    this.inBytes = Bytes.toBytes(this.value.toLowerCase());
-  }
-
-  byte[] getInBytes() {
-    return inBytes;
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  /**
-   * stores the key as column and value as hbase column value in the given
-   * column family in the entity table
-   *
-   * @param rowKey
-   * @param entityTable
-   * @param inputValue
-   * @throws IOException
-   */
-  public void store(byte[] rowKey, BufferedMutator entityTable, String key,
-      String inputValue) throws IOException {
-    if (key == null) {
-      return;
-    }
-    TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
-        Bytes.toBytes(key), inputValue, null);
-  }
-
-  /**
-   * stores the values along with cell timestamp
-   *
-   * @param rowKey
-   * @param entityTable
-   * @param key
-   * @param timestamp
-   * @param inputValue
-   * @throws IOException
-   */
-  public void store(byte[] rowKey, BufferedMutator entityTable, String key,
-      Long timestamp, Number inputValue) throws IOException {
-    if (key == null) {
-      return;
-    }
-    TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
-        Bytes.toBytes(key), inputValue, timestamp);
-  }
-
-  // TODO add a method that accepts a byte array,
-  // iterates over the enum and returns an enum from those bytes
-}

+ 62 - 52
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java

@@ -26,19 +26,22 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
 /**
  * This implements a hbase based backend for storing application timeline entity
@@ -50,7 +53,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     TimelineWriter {
 
   private Connection conn;
-  private BufferedMutator entityTable;
+  private TypedBufferedMutator<EntityTable> entityTable;
 
   private static final Log LOG = LogFactory
       .getLog(HBaseTimelineWriterImpl.class);
@@ -72,10 +75,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     super.serviceInit(conf);
     Configuration hbaseConf = HBaseConfiguration.create(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
-    TableName entityTableName = TableName.valueOf(hbaseConf.get(
-        TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
-        TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
-    entityTable = conn.getBufferedMutator(entityTableName);
+    entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
@@ -86,9 +86,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       String flowName, String flowVersion, long flowRunId, String appId,
       TimelineEntities data) throws IOException {
 
-    byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId,
-        userId, flowName, flowRunId, appId);
-
     TimelineWriteResponse putStatus = new TimelineWriteResponse();
     for (TimelineEntity te : data.getEntities()) {
 
@@ -96,19 +93,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       if (te == null) {
         continue;
       }
-      // get row key
-      byte[] row = TimelineWriterUtils.join(
-          TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix,
-          Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId()));
-
-      storeInfo(row, te, flowVersion);
-      storeEvents(row, te.getEvents());
-      storeConfig(row, te.getConfigs());
-      storeMetrics(row, te.getMetrics());
-      storeRelations(row, te.getIsRelatedToEntities(),
-          EntityColumnDetails.PREFIX_IS_RELATED_TO);
-      storeRelations(row, te.getRelatesToEntities(),
-          EntityColumnDetails.PREFIX_RELATES_TO);
+
+      byte[] rowKey =
+          EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
+              te);
+
+      storeInfo(rowKey, te, flowVersion);
+      storeEvents(rowKey, te.getEvents());
+      storeConfig(rowKey, te.getConfigs());
+      storeMetrics(rowKey, te.getMetrics());
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          EntityColumnPrefix.IS_RELATED_TO);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          EntityColumnPrefix.RELATES_TO);
     }
 
     return putStatus;
@@ -119,10 +116,15 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
    */
   private void storeRelations(byte[] rowKey,
       Map<String, Set<String>> connectedEntities,
-      EntityColumnDetails columnNamePrefix) throws IOException {
-    for (Map.Entry<String, Set<String>> entry : connectedEntities.entrySet()) {
-      columnNamePrefix.store(rowKey, entityTable, entry.getKey(),
-          entry.getValue());
+      EntityColumnPrefix entityColumnPrefix) throws IOException {
+    for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+        .entrySet()) {
+      // id3?id4?id5
+      String compoundValue =
+          Separator.VALUES.joinEncoded(connectedEntity.getValue());
+
+      entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(),
+          null, compoundValue);
     }
   }
 
@@ -132,13 +134,13 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion)
       throws IOException {
 
-    EntityColumnDetails.ID.store(rowKey, entityTable, te.getId());
-    EntityColumnDetails.TYPE.store(rowKey, entityTable, te.getType());
-    EntityColumnDetails.CREATED_TIME.store(rowKey, entityTable,
+    EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
+    EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
+    EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
         te.getCreatedTime());
-    EntityColumnDetails.MODIFIED_TIME.store(rowKey, entityTable,
+    EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
         te.getModifiedTime());
-    EntityColumnDetails.FLOW_VERSION.store(rowKey, entityTable, flowVersion);
+    EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
   }
 
   /**
@@ -150,8 +152,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       return;
     }
     for (Map.Entry<String, String> entry : config.entrySet()) {
-      EntityColumnFamily.CONFIG.store(rowKey, entityTable,
-          entry.getKey(), entry.getValue());
+      EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
+          null, entry.getValue());
     }
   }
 
@@ -163,11 +165,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       throws IOException {
     if (metrics != null) {
       for (TimelineMetric metric : metrics) {
-        String key = metric.getId();
+        String metricColumnQualifier = metric.getId();
         Map<Long, Number> timeseries = metric.getValues();
-        for (Map.Entry<Long, Number> entry : timeseries.entrySet()) {
-          EntityColumnFamily.METRICS.store(rowKey, entityTable, key,
-              entry.getKey(), entry.getValue());
+        for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+          Long timestamp = timeseriesEntry.getKey();
+          EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+              metricColumnQualifier, timestamp, timeseriesEntry.getValue());
         }
       }
     }
@@ -181,19 +184,27 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     if (events != null) {
       for (TimelineEvent event : events) {
         if (event != null) {
-          String id = event.getId();
-          if (id != null) {
-            byte[] idBytes = Bytes.toBytes(id);
+          String eventId = event.getId();
+          if (eventId != null) {
             Map<String, Object> eventInfo = event.getInfo();
             if (eventInfo != null) {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
-                EntityColumnDetails.PREFIX_EVENTS.store(rowKey,
-                    entityTable, idBytes, info.getKey(), info.getValue());
-              }
+                // eventId?infoKey
+                byte[] columnQualifierFirst =
+                    Bytes.toBytes(Separator.VALUES.encode(eventId));
+                byte[] compoundColumnQualifierBytes =
+                    Separator.VALUES.join(columnQualifierFirst,
+                        Bytes.toBytes(info.getKey()));
+                // convert back to string to avoid additional API on store.
+                String compoundColumnQualifier =
+                    Bytes.toString(compoundColumnQualifierBytes);
+                EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+                    compoundColumnQualifier, null, info.getValue());
+              } // for info: eventInfo
             }
           }
         }
-      }
+      } // event : events
     }
   }
 
@@ -204,8 +215,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   }
 
   /**
-   * close the hbase connections
-   * The close APIs perform flushing and release any
+   * close the hbase connections The close APIs perform flushing and release any
    * resources held
    */
   @Override

+ 0 - 71
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java

@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * contains the constants used in the context of schema accesses for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * information
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TimelineEntitySchemaConstants {
-
-  /** entity prefix */
-  public static final String ENTITY_PREFIX =
-      YarnConfiguration.TIMELINE_SERVICE_PREFIX
-      + ".entity";
-
-  /** config param name that specifies the entity table name */
-  public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX
-      + ".table.name";
-
-  /**
-   * config param name that specifies the TTL for metrics column family in
-   * entity table
-   */
-  public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX
-      + ".table.metrics.ttl";
-
-  /** default value for entity table name */
-  public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity";
-
-  /** in bytes default value for entity table name */
-  static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes
-      .toBytes(DEFAULT_ENTITY_TABLE_NAME);
-
-  /** separator in row key */
-  public static final String ROW_KEY_SEPARATOR = "!";
-
-  /** byte representation of the separator in row key */
-  static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes
-      .toBytes(ROW_KEY_SEPARATOR);
-
-  public static final byte ZERO_BYTES = 0;
-
-  /** default TTL is 30 days for metrics timeseries */
-  public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000;
-
-  /** default max number of versions */
-  public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000;
-}

+ 17 - 117
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java

@@ -19,21 +19,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -41,7 +26,18 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
 /**
  * This creates the schema for a hbase based backend for storing application
@@ -53,18 +49,6 @@ public class TimelineSchemaCreator {
 
   final static String NAME = TimelineSchemaCreator.class.getSimpleName();
   private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
-  final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"),
-      Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"),
-      Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"),
-      Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"),
-      Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"),
-      Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"),
-      Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"),
-      Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"),
-      Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"),
-      Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") };
-
-  public static final String SPLIT_KEY_PREFIX_LENGTH = "4";
 
   public static void main(String[] args) throws Exception {
 
@@ -79,13 +63,12 @@ public class TimelineSchemaCreator {
     // Grab the entityTableName argument
     String entityTableName = commandLine.getOptionValue("e");
     if (StringUtils.isNotBlank(entityTableName)) {
-      hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
-          entityTableName);
+      hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName);
     }
-    String entityTable_TTL_Metrics = commandLine.getOptionValue("m");
-    if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) {
-      hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
-          entityTable_TTL_Metrics);
+    String entityTableTTLMetrics = commandLine.getOptionValue("m");
+    if (StringUtils.isNotBlank(entityTableTTLMetrics)) {
+      int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
+      new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
     }
     createAllTables(hbaseConf);
   }
@@ -136,7 +119,7 @@ public class TimelineSchemaCreator {
       if (admin == null) {
         throw new IOException("Cannot create table since admin is null");
       }
-      createTimelineEntityTable(admin, hbaseConf);
+      new EntityTable().createTable(admin, hbaseConf);
     } finally {
       if (conn != null) {
         conn.close();
@@ -144,88 +127,5 @@ public class TimelineSchemaCreator {
     }
   }
 
-  /**
-   * Creates a table with column families info, config and metrics
-   * info stores information about a timeline entity object
-   * config stores configuration data of a timeline entity object
-   * metrics stores the metrics of a timeline entity object
-   *
-   * Example entity table record:
-   * <pre>
-   *|------------------------------------------------------------|
-   *|  Row       | Column Family  | Column Family | Column Family|
-   *|  key       | info           | metrics       | config       |
-   *|------------------------------------------------------------|
-   *| userName!  | id:entityId    | metricName1:  | configKey1:  |
-   *| clusterId! |                | metricValue1  | configValue1 |
-   *| flowId!    | type:entityType| @timestamp1   |              |
-   *| flowRunId! |                |               | configKey2:  |
-   *| AppId!     | created_time:  | metricName1:  | configValue2 |
-   *| entityType!| 1392993084018  | metricValue2  |              |
-   *| entityId   |                | @timestamp2   |              |
-   *|            | modified_time: |               |              |
-   *|            | 1392995081012  | metricName2:  |              |
-   *|            |                | metricValue1  |              |
-   *|            | r!relatesToKey:| @timestamp2   |              |
-   *|            | id3!id4!id5    |               |              |
-   *|            |                |               |              |
-   *|            | s!isRelatedToKey|              |              |
-   *|            | id7!id9!id5    |               |              |
-   *|            |                |               |              |
-   *|            | e!eventKey:    |               |              |
-   *|            | eventValue     |               |              |
-   *|            |                |               |              |
-   *|            | flowVersion:   |               |              |
-   *|            | versionValue   |               |              |
-   *|------------------------------------------------------------|
-   *</pre>
-   * @param admin
-   * @param hbaseConf
-   * @throws IOException
-   */
-  public static void createTimelineEntityTable(Admin admin,
-      Configuration hbaseConf) throws IOException {
-
-    TableName table = TableName.valueOf(hbaseConf.get(
-        TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
-        TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
-    if (admin.tableExists(table)) {
-      // do not disable / delete existing table
-      // similar to the approach taken by map-reduce jobs when
-      // output directory exists
-      throw new IOException("Table " + table.getNameAsString()
-          + " already exists.");
-    }
-
-    HTableDescriptor entityTableDescp = new HTableDescriptor(table);
-    HColumnDescriptor cf1 = new HColumnDescriptor(
-        EntityColumnFamily.INFO.getInBytes());
-    cf1.setBloomFilterType(BloomType.ROWCOL);
-    entityTableDescp.addFamily(cf1);
-
-    HColumnDescriptor cf2 = new HColumnDescriptor(
-        EntityColumnFamily.CONFIG.getInBytes());
-    cf2.setBloomFilterType(BloomType.ROWCOL);
-    cf2.setBlockCacheEnabled(true);
-    entityTableDescp.addFamily(cf2);
-
-    HColumnDescriptor cf3 = new HColumnDescriptor(
-        EntityColumnFamily.METRICS.getInBytes());
-    entityTableDescp.addFamily(cf3);
-    cf3.setBlockCacheEnabled(true);
-    // always keep 1 version (the latest)
-    cf3.setMinVersions(1);
-    cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT);
-    cf3.setTimeToLive(hbaseConf.getInt(
-        TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
-        TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT));
-    entityTableDescp
-        .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
-    entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
-        SPLIT_KEY_PREFIX_LENGTH);
-    admin.createTable(entityTableDescp, splits);
-    LOG.info("Status of table creation for " + table.getNameAsString() + "="
-        + admin.tableExists(table));
 
-  }
 }

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java

@@ -15,17 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.service.Service;
 
 /**
  * This interface is for storing application timeline information.

+ 0 - 344
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java

@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.Range;
-
-/**
- * bunch of utility functions used across TimelineWriter classes
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineWriterUtils {
-
-  /** empty bytes */
-  public static final byte[] EMPTY_BYTES = new byte[0];
-  private static final String SPACE = " ";
-  private static final String UNDERSCORE = "_";
-  private static final String EMPTY_STRING = "";
-
-  /**
-   * Returns a single byte array containing all of the individual component
-   * arrays separated by the separator array.
-   *
-   * @param separator
-   * @param components
-   * @return byte array after joining the components
-   */
-  public static byte[] join(byte[] separator, byte[]... components) {
-    if (components == null || components.length == 0) {
-      return EMPTY_BYTES;
-    }
-
-    int finalSize = 0;
-    if (separator != null) {
-      finalSize = separator.length * (components.length - 1);
-    }
-    for (byte[] comp : components) {
-      if (comp != null) {
-        finalSize += comp.length;
-      }
-    }
-
-    byte[] buf = new byte[finalSize];
-    int offset = 0;
-    for (int i = 0; i < components.length; i++) {
-      if (components[i] != null) {
-        System.arraycopy(components[i], 0, buf, offset, components[i].length);
-        offset += components[i].length;
-        if (i < (components.length - 1) && separator != null
-            && separator.length > 0) {
-          System.arraycopy(separator, 0, buf, offset, separator.length);
-          offset += separator.length;
-        }
-      }
-    }
-    return buf;
-  }
-
-  /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
-   *
-   * @param source
-   * @param separator
-   * @return byte[] array after splitting the source
-   */
-  public static byte[][] split(byte[] source, byte[] separator) {
-    return split(source, separator, -1);
-  }
-
-  /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
-   *
-   * @param source
-   * @param separator
-   * @param limit
-   * @return byte[][] after splitting the input source
-   */
-  public static byte[][] split(byte[] source, byte[] separator, int limit) {
-    List<Range> segments = splitRanges(source, separator, limit);
-
-    byte[][] splits = new byte[segments.size()][];
-    for (int i = 0; i < segments.size(); i++) {
-      Range r = segments.get(i);
-      byte[] tmp = new byte[r.length()];
-      if (tmp.length > 0) {
-        System.arraycopy(source, r.start(), tmp, 0, r.length());
-      }
-      splits[i] = tmp;
-    }
-    return splits;
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator) {
-    return splitRanges(source, separator, -1);
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   * @param source the source data
-   * @param separator the separator pattern to look for
-   * @param limit the maximum number of splits to identify in the source
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator, int limit) {
-    List<Range> segments = new ArrayList<Range>();
-    if ((source == null) || (separator == null)) {
-      return segments;
-    }
-    int start = 0;
-    itersource: for (int i = 0; i < source.length; i++) {
-      for (int j = 0; j < separator.length; j++) {
-        if (source[i + j] != separator[j]) {
-          continue itersource;
-        }
-      }
-      // all separator elements matched
-      if (limit > 0 && segments.size() >= (limit-1)) {
-        // everything else goes in one final segment
-        break;
-      }
-
-      segments.add(new Range(start, i));
-      start = i + separator.length;
-      // i will be incremented again in outer for loop
-      i += separator.length-1;
-    }
-    // add in remaining to a final range
-    if (start <= source.length) {
-      segments.add(new Range(start, source.length));
-    }
-    return segments;
-  }
-
-  /**
-   * converts run id into it's inverse timestamp
-   * @param flowRunId
-   * @return inverted long
-   */
-  public static long encodeRunId(Long flowRunId) {
-    return Long.MAX_VALUE - flowRunId;
-  }
-
-  /**
-   * return a value from the Map as a String
-   * @param key
-   * @param values
-   * @return value as a String or ""
-   * @throws IOException 
-   */
-  public static String getValueAsString(final byte[] key,
-      final Map<byte[], byte[]> values) throws IOException {
-    if( values == null ) {
-      return EMPTY_STRING;
-    }
-    byte[] value = values.get(key);
-    if (value != null) {
-      return GenericObjectMapper.read(value).toString();
-    } else {
-      return EMPTY_STRING;
-    }
-  }
-
-  /**
-   * return a value from the Map as a long
-   * @param key
-   * @param values
-   * @return value as Long or 0L
-   * @throws IOException 
-   */
-  public static long getValueAsLong(final byte[] key,
-      final Map<byte[], byte[]> values) throws IOException {
-    if (values == null) {
-      return 0;
-    }
-    byte[] value = values.get(key);
-    if (value != null) {
-      Number val = (Number) GenericObjectMapper.read(value);
-      return val.longValue();
-    } else {
-      return 0L;
-    }
-  }
-
-  /**
-   * concates the values from a Set<Strings> to return a single delimited string value
-   * @param rowKeySeparator
-   * @param values
-   * @return Value from the set of strings as a string
-   */
-  public static String getValueAsString(String rowKeySeparator,
-      Set<String> values) {
-
-    if (values == null) {
-      return EMPTY_STRING;
-    }
-    StringBuilder concatStrings = new StringBuilder();
-    for (String value : values) {
-      concatStrings.append(value);
-      concatStrings.append(rowKeySeparator);
-    }
-    // remove the last separator
-    if(concatStrings.length() > 1) {
-      concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator));
-    }
-    return concatStrings.toString();
-  }
-  /**
-   * Constructs a row key prefix for the entity table
-   * @param clusterId
-   * @param userId
-   * @param flowId
-   * @param flowRunId
-   * @param appId
-   * @return byte array with the row key prefix
-   */
-  static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId,
-      Long flowRunId, String appId) {
-    return TimelineWriterUtils.join(
-        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
-        Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)),
-        Bytes.toBytes(cleanse(flowId)),
-        Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)),
-        Bytes.toBytes(cleanse(appId)));
- }
-
-  /**
-   * Takes a string token to be used as a key or qualifier and
-   * cleanses out reserved tokens.
-   * This operation is not symmetrical.
-   * Logic is to replace all spaces and separator chars in input with
-   * underscores.
-   *
-   * @param token token to cleanse.
-   * @return String with no spaces and no separator chars
-   */
-  public static String cleanse(String token) {
-    if (token == null || token.length() == 0) {
-      return token;
-    }
-
-    String cleansed = token.replaceAll(SPACE, UNDERSCORE);
-    cleansed = cleansed.replaceAll(
-        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE);
-
-    return cleansed;
-  }
-
-  /**
-   * stores the info to the table in hbase
-   * 
-   * @param rowKey
-   * @param table
-   * @param columnFamily
-   * @param columnPrefix
-   * @param columnQualifier
-   * @param inputValue
-   * @param cellTimeStamp
-   * @throws IOException
-   */
-  public static void store(byte[] rowKey, BufferedMutator table, byte[] columnFamily,
-      byte[] columnPrefix, byte[] columnQualifier, Object inputValue,
-      Long cellTimeStamp) throws IOException {
-    if ((rowKey == null) || (table == null) || (columnFamily == null)
-        || (columnQualifier == null) || (inputValue == null)) {
-      return;
-    }
-
-    Put p = null;
-    if (cellTimeStamp == null) {
-      if (columnPrefix != null) {
-        // store with prefix
-        p = new Put(rowKey);
-        p.addColumn(
-            columnFamily,
-            join(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
-                columnPrefix, columnQualifier), GenericObjectMapper
-                .write(inputValue));
-      } else {
-        // store without prefix
-        p = new Put(rowKey);
-        p.addColumn(columnFamily, columnQualifier,
-            GenericObjectMapper.write(inputValue));
-      }
-    } else {
-      // store with cell timestamp
-      Cell cell = CellUtil.createCell(rowKey, columnFamily, columnQualifier,
-          // set the cell timestamp
-          cellTimeStamp,
-          // KeyValue Type minimum
-          TimelineEntitySchemaConstants.ZERO_BYTES,
-          GenericObjectMapper.write(inputValue));
-      p = new Put(rowKey);
-      p.add(cell);
-    }
-    if (p != null) {
-      table.mutate(p);
-    }
-
-  }
-
-}

+ 118 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * Implements behavior common to tables used in the timeline service storage.
+ *
+ * @param <T> reference to the table instance class itself for type safety.
+ */
+public abstract class BaseTable<T> {
+
+  /**
+   * Name of config variable that is used to point to this table
+   */
+  private final String tableNameConfName;
+
+  /**
+   * Unless the configuration overrides, this will be the default name for the
+   * table when it is created.
+   */
+  private final String defaultTableName;
+
+  /**
+   * @param tableNameConfName name of config variable that is used to point to
+   *          this table.
+   */
+  protected BaseTable(String tableNameConfName, String defaultTableName) {
+    this.tableNameConfName = tableNameConfName;
+    this.defaultTableName = defaultTableName;
+  }
+
+  /**
+   * Used to create a type-safe mutator for this table.
+   *
+   * @param hbaseConf used to read table name
+   * @param conn used to create a table from.
+   * @return a type safe {@link BufferedMutator} for the entity table.
+   * @throws IOException
+   */
+  public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf,
+      Connection conn) throws IOException {
+
+    TableName tableName = this.getTableName(hbaseConf);
+
+    // Plain buffered mutator
+    BufferedMutator bufferedMutator = conn.getBufferedMutator(tableName);
+
+    // Now make this thing type safe.
+    // This is how service initialization should hang on to this variable, with
+    // the proper type
+    TypedBufferedMutator<T> table =
+        new BufferedMutatorDelegator<T>(bufferedMutator);
+
+    return table;
+  }
+
+  /**
+   * @param hbaseConf used to read settings that override defaults
+   * @param conn used to create table from
+   * @param scan that specifies what you want to read from this table.
+   * @return scanner for the table.
+   * @throws IOException
+   */
+  public ResultScanner getResultScanner(Configuration hbaseConf,
+      Connection conn, Scan scan) throws IOException {
+    Table table = conn.getTable(getTableName(hbaseConf));
+    return table.getScanner(scan);
+  }
+
+  /**
+   * Get the table name for this table.
+   *
+   * @param hbaseConf
+   */
+  public TableName getTableName(Configuration hbaseConf) {
+    TableName table =
+        TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName));
+    return table;
+
+  }
+
+  /**
+   * Used to create the table in HBase. Should be called only once (per HBase
+   * instance).
+   *
+   * @param admin
+   * @param hbaseConf
+   */
+  public abstract void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException;
+
+}

+ 73 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java

@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * To be used to wrap an actual {@link BufferedMutator} in a type safe manner
+ *
+ * @param <T> The class referring to the table to be written to.
+ */
+class BufferedMutatorDelegator<T> implements TypedBufferedMutator<T> {
+
+  private final BufferedMutator bufferedMutator;
+
+  /**
+   * @param bufferedMutator the mutator to be wrapped for delegation. Shall not
+   *          be null.
+   */
+  public BufferedMutatorDelegator(BufferedMutator bufferedMutator) {
+    this.bufferedMutator = bufferedMutator;
+  }
+
+  public TableName getName() {
+    return bufferedMutator.getName();
+  }
+
+  public Configuration getConfiguration() {
+    return bufferedMutator.getConfiguration();
+  }
+
+  public void mutate(Mutation mutation) throws IOException {
+    bufferedMutator.mutate(mutation);
+  }
+
+  public void mutate(List<? extends Mutation> mutations) throws IOException {
+    bufferedMutator.mutate(mutations);
+  }
+
+  public void close() throws IOException {
+    bufferedMutator.close();
+  }
+
+  public void flush() throws IOException {
+    bufferedMutator.flush();
+  }
+
+  public long getWriteBufferSize() {
+    return bufferedMutator.getWriteBufferSize();
+  }
+
+}

+ 59 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A Column represents the way to store a fully qualified column in a specific
+ * table.
+ */
+public interface Column<T> {
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      Long timestamp, Object inputValue) throws IOException;
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link Cell}.
+   *
+   * @param result Cannot be null
+   * @return result object (can be cast to whatever object was written to), or
+   *         null when result doesn't contain this column.
+   * @throws IOException
+   */
+  public Object readResult(Result result) throws IOException;
+
+}

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Type safe column family.
+ *
+ * @param <T> refers to the table for which this column family is used for.
+ */
+public interface ColumnFamily<T> {
+
+  /**
+   * Keep a local copy if you need to avoid overhead of repeated cloning.
+   *
+   * @return a clone of the byte representation of the column family.
+   */
+  public byte[] getBytes();
+
+}

+ 247 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java

@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+
+/**
+ * This class is meant to be used only by explicit Columns, and not directly to
+ * write by clients.
+ *
+ * @param <T> refers to the table.
+ */
+public class ColumnHelper<T> {
+
+  private final ColumnFamily<T> columnFamily;
+
+  /**
+   * Local copy of bytes representation of columnFamily so that we can avoid
+   * cloning a new copy over and over.
+   */
+  private final byte[] columnFamilyBytes;
+
+  public ColumnHelper(ColumnFamily<T> columnFamily) {
+    this.columnFamily = columnFamily;
+    columnFamilyBytes = columnFamily.getBytes();
+  }
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table
+   * @param columnQualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+      byte[] columnQualifier, Long timestamp, Object inputValue)
+      throws IOException {
+    if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
+      return;
+    }
+    Put p = new Put(rowKey);
+
+    if (timestamp == null) {
+      p.addColumn(columnFamilyBytes, columnQualifier,
+          GenericObjectMapper.write(inputValue));
+    } else {
+      p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+          GenericObjectMapper.write(inputValue));
+    }
+    tableMutator.mutate(p);
+  }
+
+  /**
+   * @return the column family for this column implementation.
+   */
+  public ColumnFamily<T> getColumnFamily() {
+    return columnFamily;
+  }
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link Cell}.
+   *
+   * @param result from which to read the value. Cannot be null
+   * @param columnQualifierBytes referring to the column to be read.
+   * @return latest version of the specified column of whichever object was
+   *         written.
+   * @throws IOException
+   */
+  public Object readResult(Result result, byte[] columnQualifierBytes)
+      throws IOException {
+    if (result == null || columnQualifierBytes == null) {
+      return null;
+    }
+
+    // Would have preferred to be able to use getValueAsByteBuffer and get a
+    // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
+    // that.
+    byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
+    return GenericObjectMapper.read(value);
+  }
+
+  /**
+   * @param result from which to reads timeseries data
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *          columns are returned.
+   * @return the cell values at each respective time in for form
+   *         {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}
+   * @throws IOException
+   */
+  public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults(
+      Result result, byte[] columnPrefixBytes) throws IOException {
+
+    NavigableMap<String, NavigableMap<Long, Number>> results =
+        new TreeMap<String, NavigableMap<Long, Number>>();
+
+    if (result != null) {
+      NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> resultMap =
+          result.getMap();
+
+      NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
+          resultMap.get(columnFamilyBytes);
+
+      // could be that there is no such column family.
+      if (columnCellMap != null) {
+        for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
+            .entrySet()) {
+          String columnName = null;
+          if (columnPrefixBytes == null) {
+            // Decode the spaces we encoded in the column name.
+            columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts =
+                Separator.QUALIFIERS.split(entry.getKey(), 2);
+            byte[] actualColumnPrefixBytes = columnNameParts[0];
+            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                && columnNameParts.length == 2) {
+              // This is the prefix that we want
+              columnName = Separator.decode(columnNameParts[1]);
+            }
+          }
+
+          // If this column has the prefix we want
+          if (columnName != null) {
+            NavigableMap<Long, Number> cellResults =
+                new TreeMap<Long, Number>();
+            NavigableMap<Long, byte[]> cells = entry.getValue();
+            if (cells != null) {
+              for (Entry<Long, byte[]> cell : cells.entrySet()) {
+                Number value =
+                    (Number) GenericObjectMapper.read(cell.getValue());
+                cellResults.put(cell.getKey(), value);
+              }
+            }
+            results.put(columnName, cellResults);
+          }
+        } // for entry : columnCellMap
+      } // if columnCellMap != null
+    } // if result != null
+    return results;
+  }
+
+  /**
+   * @param result from which to read columns
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *          columns are returned.
+   * @return the latest values of columns in the column family.
+   * @throws IOException
+   */
+  public Map<String, Object> readResults(Result result, byte[] columnPrefixBytes)
+      throws IOException {
+    Map<String, Object> results = new HashMap<String, Object>();
+
+    if (result != null) {
+      Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+      for (Entry<byte[], byte[]> entry : columns.entrySet()) {
+        if (entry.getKey() != null && entry.getKey().length > 0) {
+
+          String columnName = null;
+          if (columnPrefixBytes == null) {
+            // Decode the spaces we encoded in the column name.
+            columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts =
+                Separator.QUALIFIERS.split(entry.getKey(), 2);
+            byte[] actualColumnPrefixBytes = columnNameParts[0];
+            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                && columnNameParts.length == 2) {
+              // This is the prefix that we want
+              columnName = Separator.decode(columnNameParts[1]);
+            }
+          }
+
+          // If this column has the prefix we want
+          if (columnName != null) {
+            Object value = GenericObjectMapper.read(entry.getValue());
+            results.put(columnName, value);
+          }
+        }
+      } // for entry
+    }
+    return results;
+  }
+
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier for the remainder of the column. Any
+   *          {@link Separator#QUALIFIERS} will be encoded in the qualifier.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      String qualifier) {
+
+    // We don't want column names to have spaces
+    byte[] encodedQualifier = Bytes.toBytes(Separator.SPACE.encode(qualifier));
+    if (columnPrefixBytes == null) {
+      return encodedQualifier;
+    }
+
+    // Convert qualifier to lower case, strip of separators and tag on column
+    // prefix.
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, encodedQualifier);
+    return columnQualifier;
+  }
+
+}

+ 83 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * Used to represent a partially qualified column, where the actual column name
+ * will be composed of a prefix and the remainder of the column qualifier. The
+ * prefix can be null, in which case the column qualifier will be completely
+ * determined when the values are stored.
+ */
+public interface ColumnPrefix<T> {
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      String qualifier, Long timestamp, Object inputValue) throws IOException;
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones the
+   * value content of the hosting {@link Cell}.
+   *
+   * @param result Cannot be null
+   * @param qualifier column qualifier. Nothing gets read when null.
+   * @return result object (can be cast to whatever object was written to) or
+   *         null when specified column qualifier for this prefix doesn't exist
+   *         in the result.
+   * @throws IOException
+   */
+  public Object readResult(Result result, String qualifier) throws IOException;
+
+  /**
+   * @param resultfrom which to read columns
+   * @return the latest values of columns in the column family with this prefix
+   *         (or all of them if the prefix value is null).
+   * @throws IOException
+   */
+  public Map<String, Object> readResults(Result result) throws IOException;
+
+  /**
+   * @param result from which to reads timeseries data
+   * @return the cell values at each respective time in for form
+   *         {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}
+   * @throws IOException
+   */
+  public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults(
+      Result result) throws IOException;
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;

+ 303 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java

@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Used to separate row qualifiers, column qualifiers and compount fields.
+ */
+public enum Separator {
+
+  /**
+   * separator in key or column qualifier fields
+   */
+  QUALIFIERS("!", "%0$"),
+
+  /**
+   * separator in values, and/or compound key/column qualifier fields.
+   */
+  VALUES("?", "%1$"),
+
+  /**
+   * separator in values, often used to avoid having these in qualifiers and
+   * names. Note that if we use HTML form encoding through URLEncoder, we end up
+   * getting a + for a space, which may already occur in strings, so we don't
+   * want that.
+   */
+  SPACE(" ", "%2$");
+
+  /**
+   * The string value of this separator.
+   */
+  private final String value;
+
+  /**
+   * The URLEncoded version of this separator
+   */
+  private final String encodedValue;
+
+  /**
+   * The bye representation of value.
+   */
+  private final byte[] bytes;
+
+  /**
+   * The value quoted so that it can be used as a safe regex
+   */
+  private final String quotedValue;
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  /**
+   * @param value of the separator to use. Cannot be null or empty string.
+   * @param encodedValue choose something that isn't likely to occur in the data
+   *          itself. Cannot be null or empty string.
+   */
+  private Separator(String value, String encodedValue) {
+    this.value = value;
+    this.encodedValue = encodedValue;
+
+    // validation
+    if (value == null || value.length() == 0 || encodedValue == null
+        || encodedValue.length() == 0) {
+      throw new IllegalArgumentException(
+          "Cannot create separator from null or empty string.");
+    }
+
+    this.bytes = Bytes.toBytes(value);
+    this.quotedValue = Pattern.quote(value);
+  }
+
+  /**
+   * Used to make token safe to be used with this separator without collisions.
+   *
+   * @param token
+   * @return the token with any occurrences of this separator URLEncoded.
+   */
+  public String encode(String token) {
+    if (token == null || token.length() == 0) {
+      // Nothing to replace
+      return token;
+    }
+    return token.replace(value, encodedValue);
+  }
+
+  /**
+   * @param token
+   * @return the token with any occurrences of the encoded separator replaced by
+   *         the separator itself.
+   */
+  public String decode(String token) {
+    if (token == null || token.length() == 0) {
+      // Nothing to replace
+      return token;
+    }
+    return token.replace(encodedValue, value);
+  }
+
+  /**
+   * Encode the given separators in the token with their encoding equivalent.
+   * This means that when encoding is already present in the token itself, this
+   * is not a reversible process. See also {@link #decode(String, Separator...)}
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return non-null byte representation of the token with occurrences of the
+   *         separators encoded.
+   */
+  public static byte[] encode(String token, Separator... separators) {
+    if (token == null) {
+      return EMPTY_BYTES;
+    }
+    String result = token;
+    for (Separator separator : separators) {
+      if (separator != null) {
+        result = separator.encode(result);
+      }
+    }
+    return Bytes.toBytes(result);
+  }
+
+  /**
+   * Decode the given separators in the token with their decoding equivalent.
+   * This means that when encoding is already present in the token itself, this
+   * is not a reversible process.
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return String representation of the token with occurrences of the URL
+   *         encoded separators decoded.
+   */
+  public static String decode(byte[] token, Separator... separators) {
+    if (token == null) {
+      return null;
+    }
+    return decode(Bytes.toString(token), separators);
+  }
+
+  /**
+   * Decode the given separators in the token with their decoding equivalent.
+   * This means that when encoding is already present in the token itself, this
+   * is not a reversible process.
+   *
+   * @param token containing possible separators that need to be encoded.
+   * @param separators to be encoded in the token with their URLEncoding
+   *          equivalent.
+   * @return String representation of the token with occurrences of the URL
+   *         encoded separators decoded.
+   */
+  public static String decode(String token, Separator... separators) {
+    if (token == null) {
+      return null;
+    }
+    String result = token;
+    for (Separator separator : separators) {
+      if (separator != null) {
+        result = separator.decode(result);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Returns a single byte array containing all of the individual arrays
+   * components separated by this separator.
+   *
+   * @param components
+   * @return byte array after joining the components
+   */
+  public byte[] join(byte[]... components) {
+    if (components == null || components.length == 0) {
+      return EMPTY_BYTES;
+    }
+
+    int finalSize = 0;
+    finalSize = this.value.length() * (components.length - 1);
+    for (byte[] comp : components) {
+      if (comp != null) {
+        finalSize += comp.length;
+      }
+    }
+
+    byte[] buf = new byte[finalSize];
+    int offset = 0;
+    for (int i = 0; i < components.length; i++) {
+      if (components[i] != null) {
+        System.arraycopy(components[i], 0, buf, offset, components[i].length);
+        offset += components[i].length;
+      }
+      if (i < (components.length - 1)) {
+        System.arraycopy(this.bytes, 0, buf, offset, this.value.length());
+        offset += this.value.length();
+      }
+
+    }
+    return buf;
+  }
+
+  /**
+   * Concatenates items (as String), using this separator.
+   *
+   * @param items Items join, {@code toString()} will be called in each item.
+   *          Any occurrence of the separator in the individual strings will be
+   *          first encoded. Cannot be null.
+   * @return non-null joined result. Note that when separator is {@literal null}
+   *         the result is simply all items concatenated and the process is not
+   *         reversible through {@link #splitEncoded(String)}
+   */
+  public String joinEncoded(String... items) {
+    if (items == null || items.length == 0) {
+      return "";
+    }
+
+    StringBuilder sb = new StringBuilder(encode(items[0].toString()));
+    // Start at 1, we've already grabbed the first value at index 0
+    for (int i = 1; i < items.length; i++) {
+      sb.append(this.value);
+      sb.append(encode(items[i].toString()));
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * Concatenates items (as String), using this separator.
+   *
+   * @param items Items join, {@code toString()} will be called in each item.
+   *          Any occurrence of the separator in the individual strings will be
+   *          first encoded. Cannot be null.
+   * @return non-null joined result. Note that when separator is {@literal null}
+   *         the result is simply all items concatenated and the process is not
+   *         reversible through {@link #splitEncoded(String)}
+   */
+  public String joinEncoded(Iterable<?> items) {
+    if (items == null) {
+      return "";
+    }
+    Iterator<?> i = items.iterator();
+    if (!i.hasNext()) {
+      return "";
+    }
+
+    StringBuilder sb = new StringBuilder(encode(i.next().toString()));
+    while (i.hasNext()) {
+      sb.append(this.value);
+      sb.append(encode(i.next().toString()));
+    }
+
+    return sb.toString();
+  }
+
+  /**
+   * @param compoundValue containing individual values separated by this
+   *          separator, which have that separator encoded.
+   * @return non-null set of values from the compoundValue with the separator
+   *         decoded.
+   */
+  public Collection<String> splitEncoded(String compoundValue) {
+    List<String> result = new ArrayList<String>();
+    if (compoundValue != null) {
+      for (String value : compoundValue.split(quotedValue)) {
+        result.add(decode(value));
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Splits the source array into multiple array segments using this separator,
+   * up to a maximum of count items. This will naturally produce copied byte
+   * arrays for each of the split segments.
+   * @param source to be split
+   * @param limit on how many segments are supposed to be returned. Negative
+   *          value indicates no limit on number of segments.
+   * @return source split by this separator.
+   */
+  public byte[][] split(byte[] source, int limit) {
+    return TimelineWriterUtils.split(source, this.bytes, limit);
+  }
+
+}

+ 68 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineEntitySchemaConstants {
+
+  /**
+   * Used to create a pre-split for tables starting with a username in the
+   * prefix. TODO: this may have to become a config variable (string with
+   * separators) so that different installations can presplit based on their own
+   * commonly occurring names.
+   */
+  private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
+      Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
+      Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
+      Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
+      Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
+      Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
+      Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
+      Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
+      Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
+      Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
+      Bytes.toBytes("z") };
+
+  /**
+   * The length at which keys auto-split
+   */
+  public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+  /**
+   * @return splits for splits where a user is a prefix.
+   */
+  public final static byte[][] getUsernameSplits() {
+    byte[][] kloon = USERNAME_SPLITS.clone();
+    // Deep copy.
+    for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+      kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+    }
+    return kloon;
+  }
+
+}

+ 127 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * bunch of utility functions used across TimelineWriter classes
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineWriterUtils {
+
+  /** empty bytes */
+  public static final byte[] EMPTY_BYTES = new byte[0];
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see
+   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @return byte[] array after splitting the source
+   */
+  public static byte[][] split(byte[] source, byte[] separator) {
+    return split(source, separator, -1);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see
+   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @param limit a negative value indicates no limit on number of segments.
+   * @return byte[][] after splitting the input source
+   */
+  public static byte[][] split(byte[] source, byte[] separator, int limit) {
+    List<Range> segments = splitRanges(source, separator, limit);
+
+    byte[][] splits = new byte[segments.size()][];
+    for (int i = 0; i < segments.size(); i++) {
+      Range r = segments.get(i);
+      byte[] tmp = new byte[r.length()];
+      if (tmp.length > 0) {
+        System.arraycopy(source, r.start(), tmp, 0, r.length());
+      }
+      splits[i] = tmp;
+    }
+    return splits;
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator) {
+    return splitRanges(source, separator, -1);
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   *
+   * @param source the source data
+   * @param separator the separator pattern to look for
+   * @param limit the maximum number of splits to identify in the source
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator,
+      int limit) {
+    List<Range> segments = new ArrayList<Range>();
+    if ((source == null) || (separator == null)) {
+      return segments;
+    }
+    int start = 0;
+    itersource: for (int i = 0; i < source.length; i++) {
+      for (int j = 0; j < separator.length; j++) {
+        if (source[i + j] != separator[j]) {
+          continue itersource;
+        }
+      }
+      // all separator elements matched
+      if (limit > 0 && segments.size() >= (limit - 1)) {
+        // everything else goes in one final segment
+        break;
+      }
+
+      segments.add(new Range(start, i));
+      start = i + separator.length;
+      // i will be incremented again in outer for loop
+      i += separator.length - 1;
+    }
+    // add in remaining to a final range
+    if (start <= source.length) {
+      segments.add(new Range(start, source.length));
+    }
+    return segments;
+  }
+
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TypedBufferedMutator.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.hbase.client.BufferedMutator;
+
+/**
+ * Just a typed wrapper around {@link BufferedMutator} used to ensure that
+ * columns can write only to the table mutator for the right table.
+ */
+public interface TypedBufferedMutator<T> extends BufferedMutator {
+  // This class is intentionally left (almost) blank
+}

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java

@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 141 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java

@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies fully qualified columns for the {@link EntityTable}.
+ */
+public enum EntityColumn implements Column<EntityTable> {
+
+  /**
+   * Identifier for the entity.
+   */
+  ID(EntityColumnFamily.INFO, "id"),
+
+  /**
+   * The type of entity
+   */
+  TYPE(EntityColumnFamily.INFO, "type"),
+
+  /**
+   * When the entity was created.
+   */
+  CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
+
+  /**
+   * When it was modified.
+   */
+  MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
+
+  /**
+   * The version of the flow that this entity belongs to.
+   */
+  FLOW_VERSION(EntityColumnFamily.INFO, "flow_version");
+
+  private final ColumnHelper<EntityTable> column;
+  private final ColumnFamily<EntityTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+
+  private EntityColumn(ColumnFamily<EntityTable> columnFamily,
+      String columnQualifier) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes =
+        Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+    this.column = new ColumnHelper<EntityTable>(columnFamily);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
+      Object inputValue) throws IOException {
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  /**
+   * Retrieve an {@link EntityColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier Name of the column to retrieve
+   * @return the corresponding {@link EntityColumn} or null
+   */
+  public static final EntityColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (EntityColumn ec : EntityColumn.values()) {
+      // Find a match based only on name.
+      if (ec.getColumnQualifier().equals(columnQualifier)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link EntityColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param name Name of the column to retrieve
+   * @return the corresponding {@link EntityColumn} or null if both arguments
+   *         don't match.
+   */
+  public static final EntityColumn columnFor(EntityColumnFamily columnFamily,
+      String name) {
+
+    for (EntityColumn ec : EntityColumn.values()) {
+      // Find a match based column family and on name.
+      if (ec.columnFamily.equals(columnFamily)
+          && ec.getColumnQualifier().equals(name)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the entity table column families.
+ */
+public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i"),
+
+  /**
+   * Configurations are in a separate column family for two reasons: a) the size
+   * of the config values can be very large and b) we expect that config values
+   * are often separately accessed from other metrics and info columns.
+   */
+  CONFIGS("c"),
+
+  /**
+   * Metrics have a separate column family, because they have a separate TTL.
+   */
+  METRICS("m");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private EntityColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}

+ 212 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java

@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the entity table.
+ */
+public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+
+  /**
+   * To store TimelineEntity getIsRelatedToEntities values.
+   */
+  IS_RELATED_TO(EntityColumnFamily.INFO, "s"),
+
+  /**
+   * To store TimelineEntity getRelatesToEntities values.
+   */
+  RELATES_TO(EntityColumnFamily.INFO, "r"),
+
+  /**
+   * Lifecycle events for an entity
+   */
+  EVENT(EntityColumnFamily.INFO, "e"),
+
+  /**
+   * Config column stores configuration with config key as the column name.
+   */
+  CONFIG(EntityColumnFamily.CONFIGS, null),
+
+  /**
+   * Metrics are stored with the metric name as the column name.
+   */
+  METRIC(EntityColumnFamily.METRICS, null);
+
+  private final ColumnHelper<EntityTable> column;
+  private final ColumnFamily<EntityTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily that this column is stored in.
+   * @param columnPrefix for this column.
+   */
+  private EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+      String columnPrefix) {
+    column = new ColumnHelper<EntityTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes =
+          Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
+    }
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue) throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier =
+        ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public Map<String, Object> readResults(Result result) throws IOException {
+    return column.readResults(result, columnPrefixBytes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readTimeseriesResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public NavigableMap<String, NavigableMap<Long, Number>> readTimeseriesResults(
+      Result result) throws IOException {
+    return column.readTimeseriesResults(result, columnPrefixBytes);
+  }
+
+  /**
+   * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link EntityColumnPrefix} or null
+   */
+  public static final EntityColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (ecp.getColumnPrefix().equals(columnPrefix)) {
+        return ecp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link EntityColumnPrefix} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code (x == y == null)} or
+   * {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily The columnFamily for which to retrieve the column.
+   * @param columnPrefix Name of the column to retrieve
+   * @return the corresponding {@link EntityColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final EntityColumnPrefix columnFor(
+      EntityColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (EntityColumnPrefix ecp : EntityColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (ecp.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (ecp.getColumnPrefix() == null)) || (ecp
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return ecp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the entity table.
+ */
+public class EntityRowKey {
+  // TODO: more methods are needed for this class.
+
+  // TODO: API needs to be cleaned up.
+
+  /**
+   * Constructs a row key prefix for the entity table as follows:
+   * {@code userName!clusterId!flowId!flowRunId!AppId}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKeyPrefix(String clusterId, String userId,
+      String flowId, Long flowRunId, String appId) {
+    byte[] first =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+            flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+
+  /**
+   * Constructs a row key prefix for the entity table as follows:
+   * {@code userName!clusterId!flowId!flowRunId!AppId}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @param appId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKey(String clusterId, String userId,
+      String flowId, Long flowRunId, String appId, TimelineEntity te) {
+    byte[] first =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+            flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId));
+    byte[] third =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
+            te.getId()));
+    return Separator.QUALIFIERS.join(first, second, third);
+  }
+
+  /**
+   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invert(Long key) {
+    return Long.MAX_VALUE - key;
+  }
+
+}

+ 161 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java

@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants;
+
+/**
+ * The entity table as column families info, config and metrics. Info stores
+ * information about a timeline entity object config stores configuration data
+ * of a timeline entity object metrics stores the metrics of a timeline entity
+ * object
+ *
+ * Example entity table record:
+ *
+ * <pre>
+ * |--------------------------------------------------------------------|
+ * |  Row       | Column Family           | Column Family| Column Family|
+ * |  key       | info                    | metrics      | config       |
+ * |--------------------------------------------------------------------|
+ * | userName!  | id:entityId             | metricId1:   | configKey1:  |
+ * | clusterId! |                         | metricValue1 | configValue1 |
+ * | flowId!    | type:entityType         | @timestamp1  |              |
+ * | flowRunId! |                         |              | configKey2:  |
+ * | AppId!     | created_time:           | metriciD1:   | configValue2 |
+ * | entityType!| 1392993084018           | metricValue2 |              |
+ * | entityId   |                         | @timestamp2  |              |
+ * |            | modified_time:          |              |              |
+ * |            | 1392995081012           | metricId2:   |              |
+ * |            |                         | metricValue1 |              |
+ * |            | r!relatesToKey:         | @timestamp2  |              |
+ * |            | id3?id4?id5             |              |              |
+ * |            |                         |              |              |
+ * |            | s!isRelatedToKey        |              |              |
+ * |            | id7?id9?id6             |              |              |
+ * |            |                         |              |              |
+ * |            | e!eventId?eventInfoKey: |              |              |
+ * |            | eventInfoValue          |              |              |
+ * |            |                         |              |              |
+ * |            | flowVersion:            |              |              |
+ * |            | versionValue            |              |              |
+ * |--------------------------------------------------------------------|
+ * </pre>
+ */
+public class EntityTable extends BaseTable<EntityTable> {
+  /** entity prefix */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
+
+  /** config param name that specifies the entity table name */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /**
+   * config param name that specifies the TTL for metrics column family in
+   * entity table
+   */
+  private static final String METRICS_TTL_CONF_NAME = PREFIX
+      + ".table.metrics.ttl";
+
+  /** default value for entity table name */
+  private static final String DEFAULT_TABLE_NAME = "timelineservice.entity";
+
+  /** default TTL is 30 days for metrics timeseries */
+  private static final int DEFAULT_METRICS_TTL = 2592000;
+
+  /** default max number of versions */
+  private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000;
+
+  private static final Log LOG = LogFactory.getLog(EntityTable.class);
+
+  public EntityTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor entityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(EntityColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    entityTableDescp.addFamily(infoCF);
+
+    HColumnDescriptor configCF =
+        new HColumnDescriptor(EntityColumnFamily.CONFIGS.getBytes());
+    configCF.setBloomFilterType(BloomType.ROWCOL);
+    configCF.setBlockCacheEnabled(true);
+    entityTableDescp.addFamily(configCF);
+
+    HColumnDescriptor metricsCF =
+        new HColumnDescriptor(EntityColumnFamily.METRICS.getBytes());
+    entityTableDescp.addFamily(metricsCF);
+    metricsCF.setBlockCacheEnabled(true);
+    // always keep 1 version (the latest)
+    metricsCF.setMinVersions(1);
+    metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+    metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
+        DEFAULT_METRICS_TTL));
+    entityTableDescp
+        .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+    entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+        TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+    admin.createTable(entityTableDescp,
+        TimelineEntitySchemaConstants.getUsernameSplits());
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+
+  /**
+   * @param metricsTTL time to live parameter for the metricss in this table.
+   * @param hbaseConf configururation in which to set the metrics TTL config
+   *          variable.
+   */
+  public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
+    hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
+  }
+
+}

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/package-info.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+

+ 140 - 112
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java

@@ -18,43 +18,41 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
- * Unit test HBaseTimelineWriterImpl
- * YARN 3411
- *
  * @throws Exception
  */
 public class TestHBaseTimelineWriterImpl {
@@ -69,12 +67,8 @@ public class TestHBaseTimelineWriterImpl {
   }
 
   private static void createSchema() throws IOException {
-    byte[][] families = new byte[3][];
-    families[0] = EntityColumnFamily.INFO.getInBytes();
-    families[1] = EntityColumnFamily.CONFIG.getInBytes();
-    families[2] = EntityColumnFamily.METRICS.getInBytes();
-    TimelineSchemaCreator.createTimelineEntityTable(util.getHBaseAdmin(),
-        util.getConfiguration());
+    new EntityTable()
+        .createTable(util.getHBaseAdmin(), util.getConfiguration());
   }
 
   @Test
@@ -151,18 +145,15 @@ public class TestHBaseTimelineWriterImpl {
 
       // scan the table and see that entity exists
       Scan s = new Scan();
-      byte[] startRow = TimelineWriterUtils.getRowKeyPrefix(cluster, user, flow,
-          runid, appName);
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
       s.setStartRow(startRow);
       s.setMaxVersions(Integer.MAX_VALUE);
-      ResultScanner scanner = null;
-      TableName entityTableName = TableName
-          .valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME);
       Connection conn = ConnectionFactory.createConnection(c1);
-      Table entityTable = conn.getTable(entityTableName);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
       int rowCount = 0;
       int colCount = 0;
-      scanner = entityTable.getScanner(s);
       for (Result result : scanner) {
         if (result != null && !result.isEmpty()) {
           rowCount++;
@@ -172,37 +163,77 @@ public class TestHBaseTimelineWriterImpl {
               entity));
 
           // check info column family
-          NavigableMap<byte[], byte[]> infoValues = result
-              .getFamilyMap(EntityColumnFamily.INFO.getInBytes());
-          String id1 = TimelineWriterUtils.getValueAsString(
-              EntityColumnDetails.ID.getInBytes(), infoValues);
+          String id1 = EntityColumn.ID.readResult(result).toString();
           assertEquals(id, id1);
-          String type1 = TimelineWriterUtils.getValueAsString(
-              EntityColumnDetails.TYPE.getInBytes(), infoValues);
+
+          String type1 = EntityColumn.TYPE.readResult(result).toString();
           assertEquals(type, type1);
-          Long cTime1 = TimelineWriterUtils.getValueAsLong(
-              EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues);
+
+          Number val = (Number) EntityColumn.CREATED_TIME.readResult(result);
+          Long cTime1 = val.longValue();
           assertEquals(cTime1, cTime);
-          Long mTime1 = TimelineWriterUtils.getValueAsLong(
-              EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues);
+
+          val = (Number) EntityColumn.MODIFIED_TIME.readResult(result);
+          Long mTime1 = val.longValue();
           assertEquals(mTime1, mTime);
-          checkRelatedEntities(isRelatedTo, infoValues,
-              EntityColumnDetails.PREFIX_IS_RELATED_TO.getInBytes());
-          checkRelatedEntities(relatesTo, infoValues,
-              EntityColumnDetails.PREFIX_RELATES_TO.getInBytes());
-
-          // check config column family
-          NavigableMap<byte[], byte[]> configValuesResult = result
-              .getFamilyMap(EntityColumnFamily.CONFIG.getInBytes());
-          checkConfigs(configValuesResult, conf);
-
-          NavigableMap<byte[], byte[]> metricsResult = result
-              .getFamilyMap(EntityColumnFamily.METRICS.getInBytes());
-          checkMetricsSizeAndKey(metricsResult, metrics);
-          List<Cell> metricCells = result.getColumnCells(
-              EntityColumnFamily.METRICS.getInBytes(),
-              Bytes.toBytes(m1.getId()));
-          checkMetricsTimeseries(metricCells, m1);
+
+          // Remember isRelatedTo is of type Map<String, Set<String>>
+          for (String isRelatedToKey : isRelatedTo.keySet()) {
+            Object isRelatedToValue =
+                EntityColumnPrefix.IS_RELATED_TO.readResult(result,
+                    isRelatedToKey);
+            String compoundValue = isRelatedToValue.toString();
+            // id7?id9?id6
+            Set<String> isRelatedToValues =
+                new HashSet<String>(
+                    Separator.VALUES.splitEncoded(compoundValue));
+            assertEquals(isRelatedTo.get(isRelatedToKey).size(),
+                isRelatedToValues.size());
+            for (String v : isRelatedTo.get(isRelatedToKey)) {
+              assertTrue(isRelatedToValues.contains(v));
+            }
+          }
+
+          // RelatesTo
+          for (String relatesToKey : relatesTo.keySet()) {
+            String compoundValue =
+                EntityColumnPrefix.RELATES_TO.readResult(result, relatesToKey)
+                    .toString();
+            // id3?id4?id5
+            Set<String> relatesToValues =
+                new HashSet<String>(
+                    Separator.VALUES.splitEncoded(compoundValue));
+            assertEquals(relatesTo.get(relatesToKey).size(),
+                relatesToValues.size());
+            for (String v : relatesTo.get(relatesToKey)) {
+              assertTrue(relatesToValues.contains(v));
+            }
+          }
+
+          // Configuration
+          Map<String, Object> configColumns =
+              EntityColumnPrefix.CONFIG.readResults(result);
+          assertEquals(conf.size(), configColumns.size());
+          for (String configItem : conf.keySet()) {
+            assertEquals(conf.get(configItem), configColumns.get(configItem));
+          }
+
+          NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+              EntityColumnPrefix.METRIC.readTimeseriesResults(result);
+
+          NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
+          // We got metrics back
+          assertNotNull(metricMap);
+          // Same number of metrics as we wrote
+          assertEquals(metricValues.entrySet().size(), metricMap.entrySet()
+              .size());
+
+          // Iterate over original metrics and confirm that they are present
+          // here.
+          for (Entry<Long, Number> metricEntry : metricValues.entrySet()) {
+            assertEquals(metricEntry.getValue(),
+                metricMap.get(metricEntry.getKey()));
+          }
         }
       }
       assertEquals(1, rowCount);
@@ -212,80 +243,77 @@ public class TestHBaseTimelineWriterImpl {
       hbi.stop();
       hbi.close();
     }
-  }
-
-  private void checkMetricsTimeseries(List<Cell> metricCells,
-      TimelineMetric m1) throws IOException {
-    Map<Long, Number> timeseries = m1.getValues();
-    assertEquals(timeseries.size(), metricCells.size());
-    for (Cell c1 : metricCells) {
-      assertTrue(timeseries.containsKey(c1.getTimestamp()));
-      assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)),
-          timeseries.get(c1.getTimestamp()));
-    }
-  }
-
-  private void checkMetricsSizeAndKey(
-      NavigableMap<byte[], byte[]> metricsResult, Set<TimelineMetric> metrics) {
-    assertEquals(metrics.size(), metricsResult.size());
-    for (TimelineMetric m1 : metrics) {
-      byte[] key = Bytes.toBytes(m1.getId());
-      assertTrue(metricsResult.containsKey(key));
-    }
-  }
-
-  private void checkConfigs(NavigableMap<byte[], byte[]> configValuesResult,
-      Map<String, String> conf) throws IOException {
-
-    assertEquals(conf.size(), configValuesResult.size());
-    byte[] columnName;
-    for (String key : conf.keySet()) {
-      columnName = Bytes.toBytes(key);
-      assertTrue(configValuesResult.containsKey(columnName));
-      byte[] value = configValuesResult.get(columnName);
-      assertNotNull(value);
-      assertEquals(conf.get(key), GenericObjectMapper.read(value));
-    }
-  }
 
-  private void checkRelatedEntities(Map<String, Set<String>> isRelatedTo,
-      NavigableMap<byte[], byte[]> infoValues, byte[] columnPrefix)
-      throws IOException {
-
-    for (String key : isRelatedTo.keySet()) {
-      byte[] columnName = TimelineWriterUtils.join(
-          TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, columnPrefix,
-          Bytes.toBytes(key));
-
-      byte[] value = infoValues.get(columnName);
-      assertNotNull(value);
-      String isRelatedToEntities = GenericObjectMapper.read(value).toString();
-      assertNotNull(isRelatedToEntities);
-      assertEquals(
-          TimelineWriterUtils.getValueAsString(
-              TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR,
-              isRelatedTo.get(key)), isRelatedToEntities);
-    }
+    // Somewhat of a hack, not a separate test in order not to have to deal with
+    // test case order exectution.
+    testAdditionalEntity();
   }
 
   private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
       String flow, Long runid, String appName, TimelineEntity te) {
 
-    byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey,
-        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES);
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
 
     assertTrue(rowKeyComponents.length == 7);
     assertEquals(user, Bytes.toString(rowKeyComponents[0]));
     assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
     assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
-    assertEquals(TimelineWriterUtils.encodeRunId(runid),
-        Bytes.toLong(rowKeyComponents[3]));
-    assertEquals(TimelineWriterUtils.cleanse(appName), Bytes.toString(rowKeyComponents[4]));
+    assertEquals(EntityRowKey.invert(runid), Bytes.toLong(rowKeyComponents[3]));
+    assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
     assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
     assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
     return true;
   }
 
+  private void testAdditionalEntity() throws IOException {
+    TimelineEvent event = new TimelineEvent();
+    event.setId("foo_event_id");
+    event.setTimestamp(System.currentTimeMillis());
+    event.addInfo("foo_event", "test");
+
+    final TimelineEntity entity = new TimelineEntity();
+    entity.setId("attempt_1329348432655_0001_m_000008_18");
+    entity.setType("FOO_ATTEMPT");
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entity);
+
+    HBaseTimelineWriterImpl hbi = null;
+    try {
+      Configuration c1 = util.getConfiguration();
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String cluster = "cluster2";
+      String user = "user2";
+      String flow = "other_flow_name";
+      String flowVersion = "1111F01C2287BA";
+      long runid = 1009876543218L;
+      String appName = "some app name";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
+      hbi.stop();
+      // scan the table and see that entity exists
+      Scan s = new Scan();
+      byte[] startRow =
+          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+      s.setStartRow(startRow);
+      s.setMaxVersions(Integer.MAX_VALUE);
+      Connection conn = ConnectionFactory.createConnection(c1);
+      ResultScanner scanner = new EntityTable().getResultScanner(c1, conn, s);
+
+      int rowCount = 0;
+      for (Result result : scanner) {
+        if (result != null && !result.isEmpty()) {
+          rowCount++;
+        }
+      }
+      assertEquals(1, rowCount);
+
+    } finally {
+      hbi.stop();
+      hbi.close();
+    }
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     util.shutdownMiniCluster();

+ 129 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java

@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class TestSeparator {
+
+  private static String villain = "Dr. Heinz Doofenshmirtz";
+  private static String special =
+      ".   *   |   ?   +   (   )   [   ]   {   }   ^   $  \\ \"";
+
+  /**
+   *
+   */
+  @Test
+  public void testEncodeDecodeString() {
+
+    for (Separator separator : Separator.values()) {
+      testEncodeDecode(separator, "");
+      testEncodeDecode(separator, " ");
+      testEncodeDecode(separator, "!");
+      testEncodeDecode(separator, "?");
+      testEncodeDecode(separator, "&");
+      testEncodeDecode(separator, "+");
+      testEncodeDecode(separator, "Dr.");
+      testEncodeDecode(separator, "Heinz");
+      testEncodeDecode(separator, "Doofenshmirtz");
+      testEncodeDecode(separator, villain);
+      testEncodeDecode(separator, special);
+
+      assertNull(separator.encode(null));
+
+    }
+  }
+
+  private void testEncodeDecode(Separator separator, String token) {
+    String encoded = separator.encode(token);
+    String decoded = separator.decode(encoded);
+    String msg = "token:" + token + " separator:" + separator + ".";
+    assertEquals(msg, token, decoded);
+  }
+
+  @Test
+  public void testEncodeDecode() {
+    testEncodeDecode("Dr.", Separator.QUALIFIERS);
+    testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS);
+    testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null,
+        Separator.QUALIFIERS);
+    testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null);
+    testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE);
+    testEncodeDecode("Platypus...", (Separator) null);
+    testEncodeDecode("The what now ?!?", Separator.QUALIFIERS,
+        Separator.VALUES, Separator.SPACE);
+
+  }
+
+  /**
+   * Simple test to encode and decode using the same separators and confirm that
+   * we end up with the same as what we started with.
+   *
+   * @param token
+   * @param separators
+   */
+  private static void testEncodeDecode(String token, Separator... separators) {
+    byte[] encoded = Separator.encode(token, separators);
+    String decoded = Separator.decode(encoded, separators);
+    assertEquals(token, decoded);
+  }
+
+  @Test
+  public void testJoinStripped() {
+    List<String> stringList = new ArrayList<String>(0);
+    stringList.add("nothing");
+
+    String joined = Separator.VALUES.joinEncoded(stringList);
+    Iterable<String> split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+
+    stringList = new ArrayList<String>(3);
+    stringList.add("a");
+    stringList.add("b?");
+    stringList.add("c");
+
+    joined = Separator.VALUES.joinEncoded(stringList);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(stringList, split));
+
+    String[] stringArray1 = { "else" };
+    joined = Separator.VALUES.joinEncoded(stringArray1);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split));
+
+    String[] stringArray2 = { "d", "e?", "f" };
+    joined = Separator.VALUES.joinEncoded(stringArray2);
+    split = Separator.VALUES.splitEncoded(joined);
+    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split));
+
+    List<String> empty = new ArrayList<String>(0);
+    split = Separator.VALUES.splitEncoded(null);
+    assertTrue(Iterables.elementsEqual(empty, split));
+
+  }
+
+}

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineWriterUtils.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.junit.Test;
+
+public class TestTimelineWriterUtils {
+
+  @Test
+  public void test() {
+    // TODO: implement a test for the remaining method in TimelineWriterUtils.
+  }
+
+}