Browse Source

YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee)

Sangjin Lee 10 years ago
parent
commit
a87a00ee68
10 changed files with 659 additions and 666 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 3 4
      hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  3. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 66 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java
  5. 356 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
  6. 0 530
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
  7. 44 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
  8. 110 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java
  9. 67 57
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java
  10. 0 74
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -88,6 +88,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3906. Split the application table from the entity table. (Sangjin Lee 
     via junping_du)
 
+    YARN-3904. Refactor timelineservice.storage to add support to online and
+    offline aggregation writers (Li Lu via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

+ 3 - 4
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -499,13 +499,12 @@
   <!-- Ignore SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING warnings for Timeline Phoenix storage. -->
   <!-- Since we're using dynamic columns, we have to generate SQL statements dynamically -->
   <Match>
-    <Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixTimelineWriterImpl" />
+    <Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixOfflineAggregationWriterImpl" />
     <Or>
       <Method name="storeEntityVariableLengthFields" />
-      <Method name="storeEvents" />
-      <Method name="storeMetrics" />
-      <Method name="write" />
+      <Method name="writeAggregatedEntity" />
     </Or>
+    <Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING" />
   </Match>
   
   <!-- Following fields are used in ErrorsAndWarningsBlock, which is not a part of analysis of findbugs -->

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2005,6 +2005,16 @@ public class YarnConfiguration extends Configuration {
   public static final long    DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME =
       7*24*60*60*1000; // 7 days
 
+  // Timeline service v2 offlien aggregation related keys
+  public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline.";
+  public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR
+      = TIMELINE_OFFLINE_AGGREGATION_PREFIX
+          + "phoenix.connectionString";
+
+  public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT
+      = "jdbc:phoenix:localhost:2181:/hbase";
+
   // ///////////////////////////////
   // Shared Cache Configs
   // ///////////////////////////////

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/OfflineAggregationWriter.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+
+import java.io.IOException;
+
+/**
+ * YARN timeline service v2 offline aggregation storage interface
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class OfflineAggregationWriter extends AbstractService {
+
+  /**
+   * Construct the offline writer.
+   *
+   * @param name service name
+   */
+  public OfflineAggregationWriter(String name) {
+    super(name);
+  }
+
+  /**
+   * Persist aggregated timeline entities to the offline store based on which
+   * track this entity is to be rolled up to. The tracks along which aggregations
+   * are to be done are given by {@link OfflineAggregationInfo}.
+   *
+   * @param context a {@link TimelineCollectorContext} object that describes the
+   *                context information of the aggregated data. Depends on the
+   *                type of the aggregation, some fields of this context maybe
+   *                empty or null.
+   * @param entities {@link TimelineEntities} to be persisted.
+   * @param info an {@link OfflineAggregationInfo} object that describes the
+   *             detail of the aggregation. Current supported option is
+   *             {@link OfflineAggregationInfo#FLOW_AGGREGATION}.
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException
+   */
+  abstract TimelineWriteResponse writeAggregatedEntity(
+      TimelineCollectorContext context,
+      TimelineEntities entities, OfflineAggregationInfo info) throws IOException;
+}

+ 356 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java

@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.phoenix.util.PropertiesUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Offline aggregation Phoenix storage. This storage currently consists of two
+ * aggregation tables, one for flow level aggregation and one for user level
+ * aggregation.
+ *
+ * Example table record:
+ *
+ * <pre>
+ * |---------------------------|
+ * |  Primary   | Column Family|
+ * |  key       | metrics      |
+ * |---------------------------|
+ * | row_key    | metricId1:   |
+ * |            | metricValue1 |
+ * |            | @timestamp1  |
+ * |            |              |
+ * |            | metriciD1:   |
+ * |            | metricValue2 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            | metricId2:   |
+ * |            | metricValue1 |
+ * |            | @timestamp2  |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |            |              |
+ * |---------------------------|
+ * </pre>
+ *
+ * For the flow aggregation table, the primary key contains user, cluster, and
+ * flow id. For user aggregation table,the primary key is user.
+ *
+ * Metrics column family stores all aggregated metrics for each record.
+ */
+@Private
+@Unstable
+public class PhoenixOfflineAggregationWriterImpl
+    extends OfflineAggregationWriter {
+
+  private static final Log LOG
+      = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class);
+  private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
+      = "timeline_cf_placeholder";
+
+  /** Default Phoenix JDBC driver name */
+  private static final String DRIVER_CLASS_NAME
+      = "org.apache.phoenix.jdbc.PhoenixDriver";
+
+  /** Default Phoenix timeline config column family */
+  private static final String METRIC_COLUMN_FAMILY = "m.";
+  /** Default Phoenix timeline info column family */
+  private static final String INFO_COLUMN_FAMILY = "i.";
+  /** Default separator for Phoenix storage */
+  private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
+
+  /** Connection string to the deployed Phoenix cluster */
+  private String connString = null;
+  private Properties connProperties = new Properties();
+
+  public PhoenixOfflineAggregationWriterImpl(Properties prop) {
+    super(PhoenixOfflineAggregationWriterImpl.class.getName());
+    connProperties = PropertiesUtil.deepCopy(prop);
+  }
+
+  public PhoenixOfflineAggregationWriterImpl() {
+    super(PhoenixOfflineAggregationWriterImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    Class.forName(DRIVER_CLASS_NAME);
+    // so check it here and only read in the config if it's not overridden.
+    connString =
+        conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
+            YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT);
+    super.init(conf);
+  }
+
+  @Override
+  public TimelineWriteResponse writeAggregatedEntity(
+      TimelineCollectorContext context, TimelineEntities entities,
+      OfflineAggregationInfo info) throws IOException {
+    TimelineWriteResponse response = new TimelineWriteResponse();
+    String sql = "UPSERT INTO " + info.getTableName()
+        + " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
+        + ", created_time, modified_time, metric_names) "
+        + "VALUES ("
+        + StringUtils.repeat("?,", info.getPrimaryKeyList().length)
+        + "?, ?, ?)";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("TimelineEntity write SQL: " + sql);
+    }
+
+    try (Connection conn = getConnection();
+        PreparedStatement ps = conn.prepareStatement(sql)) {
+      for (TimelineEntity entity : entities.getEntities()) {
+        HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>();
+        if (entity.getMetrics() != null) {
+          for (TimelineMetric m : entity.getMetrics()) {
+            formattedMetrics.put(m.getId(), m);
+          }
+        }
+        int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
+        ps.setLong(idx++, entity.getCreatedTime());
+        ps.setLong(idx++, entity.getModifiedTime());
+        ps.setString(idx++, StringUtils.join(formattedMetrics.keySet().toArray(),
+            AGGREGATION_STORAGE_SEPARATOR));
+        ps.execute();
+
+        storeEntityVariableLengthFields(entity, formattedMetrics, context, conn,
+            info);
+
+        conn.commit();
+      }
+    } catch (SQLException se) {
+      LOG.error("Failed to add entity to Phoenix " + se.getMessage());
+      throw new IOException(se);
+    } catch (Exception e) {
+      LOG.error("Exception on getting connection: " + e.getMessage());
+      throw new IOException(e);
+    }
+    return response;
+  }
+
+  /**
+   * Create Phoenix tables for offline aggregation storage if the tables do not
+   * exist.
+   *
+   * @throws IOException
+   */
+  public void createPhoenixTables() throws IOException {
+    // Create tables if necessary
+    try (Connection conn = getConnection();
+        Statement stmt = conn.createStatement()) {
+      // Table schema defined as in YARN-3817.
+      String sql = "CREATE TABLE IF NOT EXISTS "
+          + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "flow_name VARCHAR NOT NULL, "
+          + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
+          + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+          + "metric_names VARCHAR, info_keys VARCHAR "
+          + "CONSTRAINT pk PRIMARY KEY("
+          + "user, cluster, flow_name))";
+      stmt.executeUpdate(sql);
+      sql = "CREATE TABLE IF NOT EXISTS "
+          + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
+          + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
+          + "metric_names VARCHAR, info_keys VARCHAR "
+          + "CONSTRAINT pk PRIMARY KEY(user, cluster))";
+      stmt.executeUpdate(sql);
+      conn.commit();
+    } catch (SQLException se) {
+      LOG.error("Failed in init data " + se.getLocalizedMessage());
+      throw new IOException(se);
+    }
+    return;
+  }
+
+  // Utility functions
+  @Private
+  @VisibleForTesting
+  Connection getConnection() throws IOException {
+    Connection conn;
+    try {
+      conn = DriverManager.getConnection(connString, connProperties);
+      conn.setAutoCommit(false);
+    } catch (SQLException se) {
+      LOG.error("Failed to connect to phoenix server! "
+          + se.getLocalizedMessage());
+      throw new IOException(se);
+    }
+    return conn;
+  }
+
+  // WARNING: This method will permanently drop a table!
+  @Private
+  @VisibleForTesting
+  void dropTable(String tableName) throws Exception {
+    try (Connection conn = getConnection();
+         Statement stmt = conn.createStatement()) {
+      String sql = "DROP TABLE " + tableName;
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
+      throw se;
+    }
+  }
+
+  private static class DynamicColumns<K> {
+    static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
+    static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
+    String columnFamilyPrefix;
+    String type;
+    Set<K> columns;
+
+    public DynamicColumns(String columnFamilyPrefix, String type,
+        Set<K> keyValues) {
+      this.columnFamilyPrefix = columnFamilyPrefix;
+      this.columns = keyValues;
+      this.type = type;
+    }
+  }
+
+  private static <K> StringBuilder appendColumnsSQL(
+      StringBuilder colNames, DynamicColumns<K> cfInfo) {
+    // Prepare the sql template by iterating through all keys
+    for (K key : cfInfo.columns) {
+      colNames.append(",").append(cfInfo.columnFamilyPrefix)
+          .append(key.toString()).append(cfInfo.type);
+    }
+    return colNames;
+  }
+
+  private static <K, V> int setValuesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos,
+      boolean converToBytes) throws SQLException {
+    int idx = startPos;
+    for (Map.Entry<K, V> entry : keyValues.entrySet()) {
+      V value = entry.getValue();
+      if (value instanceof Collection) {
+        ps.setString(idx++, StringUtils.join(
+            (Collection) value, AGGREGATION_STORAGE_SEPARATOR));
+      } else {
+        if (converToBytes) {
+          try {
+            ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
+          } catch (IOException ie) {
+            LOG.error("Exception in converting values into bytes "
+                + ie.getMessage());
+            throw new SQLException(ie);
+          }
+        } else {
+          ps.setString(idx++, value.toString());
+        }
+      }
+    }
+    return idx;
+  }
+
+  private static <K, V> int setBytesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, true);
+  }
+
+  private static <K, V> int setStringsForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, false);
+  }
+
+  private static void storeEntityVariableLengthFields(TimelineEntity entity,
+      Map<String, TimelineMetric> formattedMetrics,
+      TimelineCollectorContext context, Connection conn,
+      OfflineAggregationInfo aggregationInfo) throws SQLException {
+    int numPlaceholders = 0;
+    StringBuilder columnDefs = new StringBuilder(
+        StringUtils.join(aggregationInfo.getPrimaryKeyList(), ","));
+    if (formattedMetrics != null && formattedMetrics.size() > 0) {
+      appendColumnsSQL(columnDefs, new DynamicColumns<>(
+          METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
+          formattedMetrics.keySet()));
+      numPlaceholders += formattedMetrics.keySet().size();
+    }
+    if (numPlaceholders == 0) {
+      return;
+    }
+    StringBuilder placeholders = new StringBuilder();
+    placeholders.append(
+        StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length));
+    // numPlaceholders >= 1 now
+    placeholders.append("?")
+        .append(StringUtils.repeat(",?", numPlaceholders - 1));
+    String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
+        .append(aggregationInfo.getTableName()).append(" (").append(columnDefs)
+        .append(") VALUES(").append(placeholders).append(")").toString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL statement for variable length fields: "
+          + sqlVariableLengthFields);
+    }
+    // Use try with resource statement for the prepared statement
+    try (PreparedStatement psVariableLengthFields =
+        conn.prepareStatement(sqlVariableLengthFields)) {
+      int idx = aggregationInfo.setStringsForPrimaryKey(
+          psVariableLengthFields, context, null, 1);
+      if (formattedMetrics != null && formattedMetrics.size() > 0) {
+        idx = setBytesForColumnFamily(
+            psVariableLengthFields, formattedMetrics, idx);
+      }
+      psVariableLengthFields.execute();
+    }
+  }
+}

+ 0 - 530
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java

@@ -1,530 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-@Private
-@Unstable
-public class PhoenixTimelineWriterImpl extends AbstractService
-    implements TimelineWriter {
-
-  public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR
-      = YarnConfiguration.TIMELINE_SERVICE_PREFIX
-          + "writer.phoenix.connectionString";
-
-  public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT
-      = "jdbc:phoenix:localhost:2181:/hbase";
-
-  private static final Log LOG
-      = LogFactory.getLog(PhoenixTimelineWriterImpl.class);
-  private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
-      = "timeline_cf_placeholder";
-  // These lists are not taking effects in table creations.
-  private static final String[] PHOENIX_STORAGE_PK_LIST
-      = {"cluster", "user", "flow_name", "flow_version", "flow_run", "app_id",
-         "type", "entity_id"};
-  private static final String[] TIMELINE_EVENT_EXTRA_PK_LIST =
-      {"timestamp", "event_id"};
-  private static final String[] TIMELINE_METRIC_EXTRA_PK_LIST =
-      {"metric_id"};
-  /** Default Phoenix JDBC driver name */
-  private static final String DRIVER_CLASS_NAME
-      = "org.apache.phoenix.jdbc.PhoenixDriver";
-
-  /** Default Phoenix timeline entity table name */
-  @VisibleForTesting
-  static final String ENTITY_TABLE_NAME = "timeline_entity";
-  /** Default Phoenix event table name */
-  @VisibleForTesting
-  static final String EVENT_TABLE_NAME = "timeline_event";
-  /** Default Phoenix metric table name */
-  @VisibleForTesting
-  static final String METRIC_TABLE_NAME = "metric_singledata";
-
-  /** Default Phoenix timeline config column family */
-  private static final String CONFIG_COLUMN_FAMILY = "c.";
-  /** Default Phoenix timeline info column family */
-  private static final String INFO_COLUMN_FAMILY = "i.";
-  /** Default Phoenix event info column family */
-  private static final String EVENT_INFO_COLUMN_FAMILY = "ei.";
-  /** Default Phoenix isRelatedTo column family */
-  private static final String IS_RELATED_TO_FAMILY = "ir.";
-  /** Default Phoenix relatesTo column family */
-  private static final String RELATES_TO_FAMILY = "rt.";
-  /** Default separator for Phoenix storage */
-  private static final String PHOENIX_STORAGE_SEPARATOR = ";";
-
-  /** Connection string to the deployed Phoenix cluster */
-  @VisibleForTesting
-  String connString = null;
-  @VisibleForTesting
-  Properties connProperties = new Properties();
-
-  PhoenixTimelineWriterImpl() {
-    super((PhoenixTimelineWriterImpl.class.getName()));
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    // so check it here and only read in the config if it's not overridden.
-    connString =
-        conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
-        TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT);
-    createTables();
-    super.init(conf);
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  @Override
-  public TimelineWriteResponse write(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntities entities) throws IOException {
-    TimelineWriteResponse response = new TimelineWriteResponse();
-    TimelineCollectorContext currContext = new TimelineCollectorContext(
-        clusterId, userId, flowName, flowVersion, flowRunId, appId);
-    String sql = "UPSERT INTO " + ENTITY_TABLE_NAME
-        + " (" + StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")
-        + ", creation_time, modified_time, configs) "
-        + "VALUES (" + StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)
-        + "?, ?, ?)";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("TimelineEntity write SQL: " + sql);
-    }
-
-    try (Connection conn = getConnection();
-        PreparedStatement ps = conn.prepareStatement(sql)) {
-      for (TimelineEntity entity : entities.getEntities()) {
-        int idx = setStringsForPrimaryKey(ps, currContext, entity, 1);
-        ps.setLong(idx++, entity.getCreatedTime());
-        ps.setLong(idx++, entity.getModifiedTime());
-        String configKeys = StringUtils.join(
-            entity.getConfigs().keySet(), PHOENIX_STORAGE_SEPARATOR);
-        ps.setString(idx++, configKeys);
-        ps.execute();
-
-        storeEntityVariableLengthFields(entity, currContext, conn);
-        storeEvents(entity, currContext, conn);
-        storeMetrics(entity, currContext, conn);
-
-        conn.commit();
-      }
-    } catch (SQLException se) {
-      LOG.error("Failed to add entity to Phoenix " + se.getMessage());
-      throw new IOException(se);
-    } catch (Exception e) {
-      LOG.error("Exception on getting connection: " + e.getMessage());
-      throw new IOException(e);
-    }
-    return response;
-  }
-
-  /**
-   * Aggregates the entity information to the timeline store based on which
-   * track this entity is to be rolled up to The tracks along which aggregations
-   * are to be done are given by {@link TimelineAggregationTrack}
-   *
-   * Any errors occurring for individual write request objects will be reported
-   * in the response.
-   *
-   * @param data
-   *          a {@link TimelineEntity} object
-   *          a {@link TimelineAggregationTrack} enum value
-   * @return a {@link TimelineWriteResponse} object.
-   * @throws IOException
-   */
-  @Override
-  public TimelineWriteResponse aggregate(TimelineEntity data,
-      TimelineAggregationTrack track) throws IOException {
-    return null;
-
-  }
-
-  @Override
-  public void flush() throws IOException {
-    // currently no-op
-  }
-
-  // Utility functions
-  @Private
-  @VisibleForTesting
-  Connection getConnection() throws IOException {
-    Connection conn;
-    try {
-      Class.forName(DRIVER_CLASS_NAME);
-      conn = DriverManager.getConnection(connString, connProperties);
-      conn.setAutoCommit(false);
-    } catch (SQLException se) {
-      LOG.error("Failed to connect to phoenix server! "
-          + se.getLocalizedMessage());
-      throw new IOException(se);
-    } catch (ClassNotFoundException e) {
-      LOG.error("Class not found! " + e.getLocalizedMessage());
-      throw new IOException(e);
-    }
-    return conn;
-  }
-
-  private void createTables() throws Exception {
-    // Create tables if necessary
-    try (Connection conn = getConnection();
-        Statement stmt = conn.createStatement()) {
-      // Table schema defined as in YARN-3134.
-      String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME
-          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
-          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
-          + "flow_run UNSIGNED_LONG NOT NULL, "
-          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
-          + "entity_id VARCHAR NOT NULL, "
-          + "creation_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
-          + "configs VARCHAR, "
-          + CONFIG_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
-          + INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
-          + IS_RELATED_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
-          + RELATES_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR "
-          + "CONSTRAINT pk PRIMARY KEY("
-          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
-          + "type, entity_id))";
-      stmt.executeUpdate(sql);
-      sql = "CREATE TABLE IF NOT EXISTS " + EVENT_TABLE_NAME
-          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
-          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
-          + "flow_run UNSIGNED_LONG NOT NULL, "
-          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
-          + "entity_id VARCHAR NOT NULL, "
-          + "timestamp UNSIGNED_LONG NOT NULL, event_id VARCHAR NOT NULL, "
-          + EVENT_INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY "
-          + "CONSTRAINT pk PRIMARY KEY("
-          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
-          + "type, entity_id, timestamp DESC, event_id))";
-      stmt.executeUpdate(sql);
-      sql = "CREATE TABLE IF NOT EXISTS " + METRIC_TABLE_NAME
-          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
-          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
-          + "flow_run UNSIGNED_LONG NOT NULL, "
-          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
-          + "entity_id VARCHAR NOT NULL, "
-          + "metric_id VARCHAR NOT NULL, "
-          + "singledata VARBINARY, "
-          + "time UNSIGNED_LONG "
-          + "CONSTRAINT pk PRIMARY KEY("
-          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
-          + "type, entity_id, metric_id))";
-      stmt.executeUpdate(sql);
-      conn.commit();
-    } catch (SQLException se) {
-      LOG.error("Failed in init data " + se.getLocalizedMessage());
-      throw se;
-    }
-    return;
-  }
-
-  private static class DynamicColumns<K> {
-    static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
-    static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
-    String columnFamilyPrefix;
-    String type;
-    Set<K> columns;
-
-    public DynamicColumns(String columnFamilyPrefix, String type,
-        Set<K> keyValues) {
-      this.columnFamilyPrefix = columnFamilyPrefix;
-      this.columns = keyValues;
-      this.type = type;
-    }
-  }
-
-  private static <K> StringBuilder appendColumnsSQL(
-      StringBuilder colNames, DynamicColumns<K> cfInfo) {
-    // Prepare the sql template by iterating through all keys
-    for (K key : cfInfo.columns) {
-      colNames.append(",").append(cfInfo.columnFamilyPrefix)
-          .append(key.toString()).append(cfInfo.type);
-    }
-    return colNames;
-  }
-
-  private static <K, V> int setValuesForColumnFamily(
-      PreparedStatement ps, Map<K, V> keyValues, int startPos,
-      boolean converToBytes) throws SQLException {
-    int idx = startPos;
-    for (Map.Entry<K, V> entry : keyValues.entrySet()) {
-      V value = entry.getValue();
-      if (value instanceof Collection) {
-        ps.setString(idx++, StringUtils.join(
-            (Collection) value, PHOENIX_STORAGE_SEPARATOR));
-      } else {
-        if (converToBytes) {
-          try {
-            ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
-          } catch (IOException ie) {
-            LOG.error("Exception in converting values into bytes "
-                + ie.getMessage());
-            throw new SQLException(ie);
-          }
-        } else {
-          ps.setString(idx++, value.toString());
-        }
-      }
-    }
-    return idx;
-  }
-
-  private static <K, V> int setBytesForColumnFamily(
-      PreparedStatement ps, Map<K, V> keyValues, int startPos)
-      throws SQLException {
-    return setValuesForColumnFamily(ps, keyValues, startPos, true);
-  }
-
-  private static <K, V> int setStringsForColumnFamily(
-      PreparedStatement ps, Map<K, V> keyValues, int startPos)
-      throws SQLException {
-    return setValuesForColumnFamily(ps, keyValues, startPos, false);
-  }
-
-  private static int setStringsForPrimaryKey(PreparedStatement ps,
-      TimelineCollectorContext context, TimelineEntity entity, int startPos)
-      throws SQLException {
-    int idx = startPos;
-    ps.setString(idx++, context.getClusterId());
-    ps.setString(idx++, context.getUserId());
-    ps.setString(idx++,
-        context.getFlowName());
-    ps.setString(idx++, context.getFlowVersion());
-    ps.setLong(idx++, context.getFlowRunId());
-    ps.setString(idx++, context.getAppId());
-    ps.setString(idx++, entity.getType());
-    ps.setString(idx++, entity.getId());
-    return idx;
-  }
-
-  private static void storeEntityVariableLengthFields(TimelineEntity entity,
-      TimelineCollectorContext context, Connection conn) throws SQLException {
-    int numPlaceholders = 0;
-    StringBuilder columnDefs = new StringBuilder(
-        StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
-    if (entity.getConfigs() != null) {
-      Set<String> keySet = entity.getConfigs().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          CONFIG_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (entity.getInfo() != null) {
-      Set<String> keySet = entity.getInfo().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (entity.getIsRelatedToEntities() != null) {
-      Set<String> keySet = entity.getIsRelatedToEntities().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          IS_RELATED_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (entity.getRelatesToEntities() != null) {
-      Set<String> keySet = entity.getRelatesToEntities().keySet();
-      appendColumnsSQL(columnDefs, new DynamicColumns<>(
-          RELATES_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
-          keySet));
-      numPlaceholders += keySet.size();
-    }
-    if (numPlaceholders == 0) {
-      return;
-    }
-    StringBuilder placeholders = new StringBuilder();
-    placeholders.append(
-        StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length));
-    // numPlaceholders >= 1 now
-    placeholders.append("?")
-        .append(StringUtils.repeat(",?", numPlaceholders - 1));
-    String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
-        .append(ENTITY_TABLE_NAME).append(" (").append(columnDefs)
-        .append(") VALUES(").append(placeholders).append(")").toString();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL statement for variable length fields: "
-          + sqlVariableLengthFields);
-    }
-    // Use try with resource statement for the prepared statement
-    try (PreparedStatement psVariableLengthFields =
-        conn.prepareStatement(sqlVariableLengthFields)) {
-      int idx = setStringsForPrimaryKey(
-          psVariableLengthFields, context, entity, 1);
-      if (entity.getConfigs() != null) {
-        idx = setStringsForColumnFamily(
-            psVariableLengthFields, entity.getConfigs(), idx);
-      }
-      if (entity.getInfo() != null) {
-        idx = setBytesForColumnFamily(
-            psVariableLengthFields, entity.getInfo(), idx);
-      }
-      if (entity.getIsRelatedToEntities() != null) {
-        idx = setStringsForColumnFamily(
-            psVariableLengthFields, entity.getIsRelatedToEntities(), idx);
-      }
-      if (entity.getRelatesToEntities() != null) {
-        idx = setStringsForColumnFamily(
-            psVariableLengthFields, entity.getRelatesToEntities(), idx);
-      }
-      psVariableLengthFields.execute();
-    }
-  }
-
-  private static void storeMetrics(TimelineEntity entity,
-      TimelineCollectorContext context, Connection conn) throws SQLException {
-    if (entity.getMetrics() == null) {
-      return;
-    }
-    Set<TimelineMetric> metrics = entity.getMetrics();
-    for (TimelineMetric metric : metrics) {
-      StringBuilder sqlColumns = new StringBuilder(
-          StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
-      sqlColumns.append(",")
-          .append(StringUtils.join(TIMELINE_METRIC_EXTRA_PK_LIST, ","));
-      sqlColumns.append(",").append("singledata, time");
-      StringBuilder placeholders = new StringBuilder();
-      placeholders.append(
-          StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
-          .append(StringUtils.repeat("?,", TIMELINE_METRIC_EXTRA_PK_LIST.length));
-      placeholders.append("?, ?");
-      String sqlMetric = new StringBuilder("UPSERT INTO ")
-          .append(METRIC_TABLE_NAME).append(" (").append(sqlColumns)
-          .append(") VALUES(").append(placeholders).append(")").toString();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SQL statement for metric: " + sqlMetric);
-      }
-      try (PreparedStatement psMetrics = conn.prepareStatement(sqlMetric)) {
-        if (metric.getType().equals(TimelineMetric.Type.TIME_SERIES)) {
-          LOG.warn("The incoming timeline metric contains time series data, "
-              + "which is currently not supported by Phoenix storage. "
-              + "Time series will be truncated. ");
-        }
-        int idx = setStringsForPrimaryKey(psMetrics, context, entity, 1);
-        psMetrics.setString(idx++, metric.getId());
-        Iterator<Map.Entry<Long, Number>> currNumIter =
-            metric.getValues().entrySet().iterator();
-        if (currNumIter.hasNext()) {
-          // TODO: support time series storage
-          Map.Entry<Long, Number> currEntry = currNumIter.next();
-          psMetrics.setBytes(idx++,
-              GenericObjectMapper.write(currEntry.getValue()));
-          psMetrics.setLong(idx++, currEntry.getKey());
-        } else {
-          psMetrics.setBytes(idx++, GenericObjectMapper.write(null));
-          LOG.warn("The incoming metric contains an empty value set. ");
-        }
-        psMetrics.execute();
-      } catch (IOException ie) {
-        LOG.error("Exception on converting single data to bytes: "
-            + ie.getMessage());
-        throw new SQLException(ie);
-      }
-    }
-  }
-
-  private static void storeEvents(TimelineEntity entity,
-      TimelineCollectorContext context, Connection conn) throws SQLException {
-    if (entity.getEvents() == null) {
-      return;
-    }
-    Set<TimelineEvent> events = entity.getEvents();
-    for (TimelineEvent event : events) {
-      // We need this number to check if the incoming event's info field is empty
-      int numPlaceholders = 0;
-      StringBuilder sqlColumns = new StringBuilder(
-          StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
-      sqlColumns.append(",")
-          .append(StringUtils.join(TIMELINE_EVENT_EXTRA_PK_LIST, ","));
-      appendColumnsSQL(sqlColumns, new DynamicColumns<>(
-          EVENT_INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
-          event.getInfo().keySet()));
-      numPlaceholders += event.getInfo().keySet().size();
-      if (numPlaceholders == 0) {
-        continue;
-      }
-      StringBuilder placeholders = new StringBuilder();
-      placeholders.append(
-          StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
-          .append(StringUtils.repeat("?,", TIMELINE_EVENT_EXTRA_PK_LIST.length));
-      // numPlaceholders >= 1 now
-      placeholders.append("?")
-            .append(StringUtils.repeat(",?", numPlaceholders - 1));
-      String sqlEvents = new StringBuilder("UPSERT INTO ")
-          .append(EVENT_TABLE_NAME).append(" (").append(sqlColumns)
-          .append(") VALUES(").append(placeholders).append(")").toString();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("SQL statement for events: " + sqlEvents);
-      }
-      try (PreparedStatement psEvent = conn.prepareStatement(sqlEvents)) {
-        int idx = setStringsForPrimaryKey(psEvent, context, entity, 1);
-        psEvent.setLong(idx++, event.getTimestamp());
-        psEvent.setString(idx++, event.getId());
-        setBytesForColumnFamily(psEvent, event.getInfo(), idx);
-        psEvent.execute();
-      }
-    }
-  }
-
-  // WARNING: This method will permanently drop a table!
-  @Private
-  @VisibleForTesting
-  void dropTable(String tableName) throws Exception {
-    try (Connection conn = getConnection();
-         Statement stmt = conn.createStatement()) {
-      String sql = "DROP TABLE " + tableName;
-      stmt.executeUpdate(sql);
-    } catch (SQLException se) {
-      LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
-      throw se;
-    }
-  }
-}

+ 44 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -51,6 +53,7 @@ public class TimelineSchemaCreator {
 
   final static String NAME = TimelineSchemaCreator.class.getSimpleName();
   private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
+  private static final String PHOENIX_OPTION_SHORT = "p";
 
   public static void main(String[] args) throws Exception {
 
@@ -83,7 +86,41 @@ public class TimelineSchemaCreator {
       hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
           applicationTableName);
     }
-    createAllTables(hbaseConf);
+
+    List<Exception> exceptions = new ArrayList<>();
+    try {
+      createAllTables(hbaseConf);
+      LOG.info("Successfully created HBase schema. ");
+    } catch (IOException e) {
+      LOG.error("Error in creating hbase tables: " + e.getMessage());
+      exceptions.add(e);
+    }
+
+    // Create Phoenix data schema if needed
+    if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) {
+      Configuration phoenixConf = new Configuration();
+      try {
+        PhoenixOfflineAggregationWriterImpl phoenixWriter =
+            new PhoenixOfflineAggregationWriterImpl();
+        phoenixWriter.init(phoenixConf);
+        phoenixWriter.start();
+        phoenixWriter.createPhoenixTables();
+        phoenixWriter.stop();
+        LOG.info("Successfully created Phoenix offline aggregation schema. ");
+      } catch (IOException e) {
+        LOG.error("Error in creating phoenix tables: " + e.getMessage());
+        exceptions.add(e);
+      }
+    }
+    if (exceptions.size() > 0) {
+      LOG.warn("Schema creation finished with the following exceptions");
+      for (Exception e : exceptions) {
+        LOG.warn(e.getMessage());
+      }
+      System.exit(-1);
+    } else {
+      LOG.info("Schema creation finished successfully");
+    }
   }
 
   /**
@@ -115,6 +152,12 @@ public class TimelineSchemaCreator {
     o.setRequired(false);
     options.addOption(o);
 
+    o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false,
+        "create Phoenix offline aggregation tables");
+    // No need to set arg name since we do not need an argument here
+    o.setRequired(false);
+    options.addOption(o);
+
     CommandLineParser parser = new PosixParser();
     CommandLine commandLine = null;
     try {

+ 110 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/OfflineAggregationInfo.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Class to carry the offline aggregation information for storage level
+ * implementations. There are currently two predefined aggregation info
+ * instances that represent flow and user level offline aggregations. Depend on
+ * its implementation, a storage class may use an OfflineAggregationInfo object
+ * to decide behaviors dynamically.
+ */
+public final class OfflineAggregationInfo {
+  /**
+   * Default flow level aggregation table name
+   */
+  @VisibleForTesting
+  public static final String FLOW_AGGREGATION_TABLE_NAME
+      = "yarn_timeline_flow_aggregation";
+  /**
+   * Default user level aggregation table name
+   */
+  public static final String USER_AGGREGATION_TABLE_NAME
+      = "yarn_timeline_user_aggregation";
+
+  // These lists are not taking effects in table creations.
+  private static final String[] FLOW_AGGREGATION_PK_LIST =
+      { "user", "cluster", "flow_name" };
+  private static final String[] USER_AGGREGATION_PK_LIST = { "user", "cluster"};
+
+  private final String tableName;
+  private final String[] primaryKeyList;
+  private final PrimaryKeyStringSetter primaryKeyStringSetter;
+
+  private OfflineAggregationInfo(String table, String[] pkList,
+      PrimaryKeyStringSetter formatter) {
+    tableName = table;
+    primaryKeyList = pkList;
+    primaryKeyStringSetter = formatter;
+  }
+
+  private interface PrimaryKeyStringSetter {
+    int setValues(PreparedStatement ps, TimelineCollectorContext context,
+        String[] extraInfo, int startPos) throws SQLException;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String[] getPrimaryKeyList() {
+    return primaryKeyList.clone();
+  }
+
+  public int setStringsForPrimaryKey(PreparedStatement ps,
+      TimelineCollectorContext context, String[] extraInfo, int startPos)
+      throws SQLException {
+    return primaryKeyStringSetter.setValues(ps, context, extraInfo, startPos);
+  }
+
+  public static final OfflineAggregationInfo FLOW_AGGREGATION =
+      new OfflineAggregationInfo(FLOW_AGGREGATION_TABLE_NAME,
+          FLOW_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
+        @Override
+        public int setValues(PreparedStatement ps,
+            TimelineCollectorContext context, String[] extraInfo, int startPos)
+            throws SQLException {
+          int idx = startPos;
+          ps.setString(idx++, context.getUserId());
+          ps.setString(idx++, context.getClusterId());
+          ps.setString(idx++, context.getFlowName());
+          return idx;
+        }
+      });
+
+  public static final OfflineAggregationInfo USER_AGGREGATION =
+      new OfflineAggregationInfo(USER_AGGREGATION_TABLE_NAME,
+          USER_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
+        @Override
+        public int setValues(PreparedStatement ps,
+            TimelineCollectorContext context, String[] extraInfo, int startPos)
+            throws SQLException {
+          int idx = startPos;
+          ps.setString(idx++, context.getUserId());
+          ps.setString(idx++, context.getClusterId());
+          return idx;
+        }
+      });
+}

+ 67 - 57
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixOfflineAggregationWriterImpl.java

@@ -25,14 +25,17 @@ import static org.junit.Assert.fail;
 
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 
 import java.sql.ResultSet;
@@ -43,72 +46,36 @@ import java.util.Map;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 
-public class TestPhoenixTimelineWriterImpl extends BaseTest {
-  private static PhoenixTimelineWriterImpl writer;
+public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
+  private static PhoenixOfflineAggregationWriterImpl storage;
   private static final int BATCH_SIZE = 3;
 
   @BeforeClass
   public static void setup() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
-    writer = setupPhoenixClusterAndWriterForTest(conf);
+    storage = setupPhoenixClusterAndWriterForTest(conf);
   }
 
   @Test(timeout = 90000)
-  public void testPhoenixWriterBasic() throws Exception {
-    // Set up a list of timeline entities and write them back to Phoenix
-    int numEntity = 12;
-    TimelineEntities te =
-        TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity);
-    writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", te);
-    // Verify if we're storing all entities
-    String sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;
-    verifySQLWithCount(sql, numEntity, "Number of entities should be ");
-    // Check config (half of all entities)
-    sql = "SELECT COUNT(c.config) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(c.config VARCHAR) ";
-    verifySQLWithCount(sql, (numEntity / 2),
-        "Number of entities with config should be ");
-    // Check info (half of all entities)
-    sql = "SELECT COUNT(i.info1) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(i.info1 VARBINARY) ";
-    verifySQLWithCount(sql, (numEntity / 2),
-        "Number of entities with info should be ");
-    // Check config and info (a quarter of all entities)
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
-        + "(c.config VARCHAR, i.info1 VARBINARY) "
-        + "WHERE c.config IS NOT NULL AND i.info1 IS NOT NULL";
-    verifySQLWithCount(sql, (numEntity / 4),
-        "Number of entities with both config and info should be ");
-    // Check relatesToEntities and isRelatedToEntities
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
-        + "(rt.testType VARCHAR, ir.testType VARCHAR) "
-        + "WHERE rt.testType IS NOT NULL AND ir.testType IS NOT NULL";
-    verifySQLWithCount(sql, numEntity - 2,
-        "Number of entities with both relatesTo and isRelatedTo should be ");
-    // Check event
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.EVENT_TABLE_NAME;
-    verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
-    // Check metrics
-    sql = "SELECT COUNT(entity_id) FROM "
-        + PhoenixTimelineWriterImpl.METRIC_TABLE_NAME;
-    verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
+  public void testFlowLevelAggregationStorage() throws Exception {
+    testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
+  }
+
+  @Test(timeout = 90000)
+  public void testUserLevelAggregationStorage() throws Exception {
+    testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
   }
 
   @AfterClass
   public static void cleanup() throws Exception {
-    writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
-    writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
-    writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
-    writer.serviceStop();
+    storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
+    storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
     tearDownMiniCluster();
   }
 
-  private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest(
-      YarnConfiguration conf) throws Exception{
+  private static PhoenixOfflineAggregationWriterImpl
+    setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
+      throws Exception{
     Map<String, String> props = new HashMap<>();
     // Must update config before starting server
     props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
@@ -125,21 +92,64 @@ public class TestPhoenixTimelineWriterImpl extends BaseTest {
     // Must update config before starting server
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
 
-    PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
     // Change connection settings for test
     conf.set(
-        PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
+        YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
         getUrl());
-    myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-    myWriter.serviceInit(conf);
+    PhoenixOfflineAggregationWriterImpl
+        myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
+    myWriter.init(conf);
+    myWriter.start();
+    myWriter.createPhoenixTables();
     return myWriter;
   }
 
+  private static TimelineEntity getTestAggregationTimelineEntity() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello1";
+    String type = "testAggregationType";
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+    entity.setModifiedTime(1425016502000L);
+
+    TimelineMetric metric = new TimelineMetric();
+    metric.setId("HDFS_BYTES_READ");
+    metric.addValue(1425016501100L, 8000);
+    entity.addMetric(metric);
+
+    return entity;
+  }
+
+  private void testAggregator(OfflineAggregationInfo aggregationInfo)
+      throws Exception {
+    // Set up a list of timeline entities and write them back to Phoenix
+    int numEntity = 1;
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(getTestAggregationTimelineEntity());
+    TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
+        "user1", "testFlow", null, 0, null);
+    storage.writeAggregatedEntity(context, te,
+        aggregationInfo);
+
+    // Verify if we're storing all entities
+    String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
+    String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
+        +") FROM " + aggregationInfo.getTableName();
+    verifySQLWithCount(sql, numEntity, "Number of entities should be ");
+    // Check metric
+    sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
+        + aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
+    verifySQLWithCount(sql, numEntity,
+        "Number of entities with info should be ");
+  }
+
+
   private void verifySQLWithCount(String sql, int targetCount, String message)
       throws Exception {
     try (
         Statement stmt =
-          writer.getConnection().createStatement();
+          storage.getConnection().createStatement();
         ResultSet rs = stmt.executeQuery(sql)) {
       assertTrue("Result set empty on statement " + sql, rs.next());
       assertNotNull("Fail to execute query " + sql, rs);

+ 0 - 74
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java

@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-public class TestTimelineWriterImpl {
-  static TimelineEntities getStandardTestTimelineEntities(int listSize) {
-    TimelineEntities te = new TimelineEntities();
-    for (int i = 0; i < listSize; i++) {
-      TimelineEntity entity = new TimelineEntity();
-      String id = "hello" + i;
-      String type = "testType";
-      entity.setId(id);
-      entity.setType(type);
-      entity.setCreatedTime(1425016501000L + i);
-      entity.setModifiedTime(1425016502000L + i);
-      if (i > 0) {
-        entity.addRelatesToEntity(type, "hello" + i);
-        entity.addRelatesToEntity(type, "hello" + (i - 1));
-      }
-      if (i < listSize - 1) {
-        entity.addIsRelatedToEntity(type, "hello" + i);
-        entity.addIsRelatedToEntity(type, "hello" + (i + 1));
-      }
-      int category = i % 4;
-      switch (category) {
-      case 0:
-        entity.addConfig("config", "config" + i);
-        // Fall through deliberately
-      case 1:
-        entity.addInfo("info1", new Integer(i));
-        entity.addInfo("info2", "helloworld");
-        // Fall through deliberately
-      case 2:
-        break;
-      case 3:
-        entity.addConfig("config", "config" + i);
-        TimelineEvent event = new TimelineEvent();
-        event.setId("test event");
-        event.setTimestamp(1425016501100L + i);
-        event.addInfo("test_info", "content for " + entity.getId());
-        event.addInfo("test_info1", new Integer(i));
-        entity.addEvent(event);
-        TimelineMetric metric = new TimelineMetric();
-        metric.setId("HDFS_BYTES_READ");
-        metric.addValue(1425016501100L + i, 8000 + i);
-        entity.addMetric(metric);
-        break;
-      }
-      te.addEntity(entity);
-    }
-    return te;
-  }
-}