Browse Source

AMBARI-8872. Support point in time queries.

Siddharth Wagle 10 năm trước cách đây
mục cha
commit
72881097dc
10 tập tin đã thay đổi với 835 bổ sung341 xóa
  1. 288 187
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
  2. 381 110
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
  3. 2 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
  4. 2 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
  5. 2 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
  6. 3 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
  7. 4 3
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
  8. 20 1
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
  9. 42 34
      ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
  10. 91 1
      ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java

+ 288 - 187
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +34,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +48,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -134,7 +135,6 @@ public class PhoenixHBaseAccessor {
     }
   }
 
-
   /**
    * Get JDBC connection to HBase store. Assumption is that the hbase
    * configuration is present on the classpath and loaded by the caller into
@@ -148,13 +148,28 @@ public class PhoenixHBaseAccessor {
     return dataSource.getConnection();
   }
 
-  public static Map readMetricFromJSON(String json) throws IOException {
-    return mapper.readValue(json, metricValuesTypeRef);
+  private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
+    throws SQLException, IOException {
+    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+    metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
+
+    return metric;
   }
 
-  @SuppressWarnings("unchecked")
   static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
     throws SQLException, IOException {
+    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
+    Map<Long, Double> sortedByTimeMetrics =
+      new TreeMap<Long, Double>(readMetricFromJSON(rs.getString("METRICS")));
+    metric.setMetricValues(sortedByTimeMetrics);
+    return metric;
+  }
+
+  /**
+   * Returns common part of timeline metrics record without the values.
+   */
+  private static TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) 
+    throws SQLException {
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
@@ -163,12 +178,25 @@ public class PhoenixHBaseAccessor {
     metric.setTimestamp(rs.getLong("SERVER_TIME"));
     metric.setStartTime(rs.getLong("START_TIME"));
     metric.setType(rs.getString("UNITS"));
-    Map<Long, Double> sortedByTimeMetrics =
-        new TreeMap<Long, Double>((Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS")));
-    metric.setMetricValues(sortedByTimeMetrics);
     return metric;
   }
 
+  private static Map<Long, Double> readLastMetricValueFromJSON(String json)
+    throws IOException {
+    Map<Long, Double> values = readMetricFromJSON(json);
+    Long lastTimeStamp = Collections.max(values.keySet());
+
+    HashMap<Long, Double> valueMap = new HashMap<Long, Double>(1);
+    valueMap.put(lastTimeStamp, values.get(lastTimeStamp));
+    return valueMap;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static Map<Long, Double>  readMetricFromJSON(String json)
+    throws IOException {
+    return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
+  }
+
   static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
     throws SQLException, IOException {
     TimelineMetric metric = new TimelineMetric();
@@ -317,9 +345,11 @@ public class PhoenixHBaseAccessor {
       for (TimelineMetric metric : timelineMetrics) {
         metricRecordStmt.clearParameters();
 
-        LOG.trace("host: " + metric.getHostName() + ", " +
-          "metricName = " + metric.getMetricName() + ", " +
-          "values: " + metric.getMetricValues());
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("host: " + metric.getHostName() + ", " +
+            "metricName = " + metric.getMetricName() + ", " +
+            "values: " + metric.getMetricValues());
+        }
         Aggregator agg = new Aggregator();
         double[] aggregates =  agg.calculateAggregates(
           metric.getMetricValues());
@@ -366,31 +396,32 @@ public class PhoenixHBaseAccessor {
     }
   }
 
-
   @SuppressWarnings("unchecked")
   public TimelineMetrics getMetricRecords(final Condition condition)
     throws SQLException, IOException {
 
-    if (condition.isEmpty()) {
-      throw new SQLException("No filter criteria specified.");
-    }
+    verifyCondition(condition);
 
     Connection conn = getConnection();
     PreparedStatement stmt = null;
     TimelineMetrics metrics = new TimelineMetrics();
 
     try {
-      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-
-      ResultSet rs = stmt.executeQuery();
+      //get latest
+      if(condition.isPointInTime()){
+        stmt = getLatestMetricRecords(condition, conn, metrics);
+      } else {
+        stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
 
-      while (rs.next()) {
-        TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+        ResultSet rs = stmt.executeQuery();
+        while (rs.next()) {
+          TimelineMetric metric = getTimelineMetricFromResultSet(rs);
 
-        if (condition.isGrouped()) {
-          metrics.addOrMergeTimelineMetric(metric);
-        } else {
-          metrics.getMetrics().add(metric);
+          if (condition.isGrouped()) {
+            metrics.addOrMergeTimelineMetric(metric);
+          } else {
+            metrics.getMetrics().add(metric);
+          }
         }
       }
 
@@ -410,174 +441,221 @@ public class PhoenixHBaseAccessor {
         }
       }
     }
+
+    LOG.info("Metrics records size: " + metrics.getMetrics().size());
     return metrics;
   }
 
-  public void saveHostAggregateRecords(Map<TimelineMetric,
-    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
-    throws SQLException {
+  private PreparedStatement getLatestMetricRecords(
+    Condition condition, Connection conn, TimelineMetrics metrics)
+    throws SQLException, IOException {
+    PreparedStatement stmt = null;
+    SplitByMetricNamesCondition splitCondition =
+      new SplitByMetricNamesCondition(condition);
 
-    if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
-      Connection conn = getConnection();
-      PreparedStatement stmt = null;
+    for (String metricName: splitCondition.getOriginalMetricNames()) {
+      splitCondition.setCurrentMetric(metricName);
+      stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn,
+        splitCondition);
 
-      long start = System.currentTimeMillis();
-      int rowCount = 0;
+      ResultSet rs = stmt.executeQuery();
+      while (rs.next()) {
+        TimelineMetric metric = getLastTimelineMetricFromResultSet(rs);
+        metrics.getMetrics().add(metric);
+      }
+    }
 
-      try {
-        stmt = conn.prepareStatement(
-          String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
-
-        for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
-          hostAggregateMap.entrySet()) {
-
-          TimelineMetric metric = metricAggregate.getKey();
-          MetricHostAggregate hostAggregate = metricAggregate.getValue();
-
-          rowCount++;
-          stmt.clearParameters();
-          stmt.setString(1, metric.getMetricName());
-          stmt.setString(2, metric.getHostName());
-          stmt.setString(3, metric.getAppId());
-          stmt.setString(4, metric.getInstanceId());
-          stmt.setLong(5, metric.getTimestamp());
-          stmt.setString(6, metric.getType());
-          stmt.setDouble(7, hostAggregate.getSum());
-          stmt.setDouble(8, hostAggregate.getMax());
-          stmt.setDouble(9, hostAggregate.getMin());
-          stmt.setDouble(10, hostAggregate.getNumberOfSamples());
-
-          try {
-            // TODO: Why this exception is swallowed
-            stmt.executeUpdate();
-          } catch (SQLException sql) {
-            LOG.error(sql);
-          }
+    return stmt;
+  }
 
-          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-            conn.commit();
-            rowCount = 0;
-          }
+  /**
+   * Get metrics aggregated across hosts.
+   *
+   * @param condition @Condition
+   * @return @TimelineMetrics
+   * @throws SQLException
+   */
+  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+    throws SQLException {
 
-        }
+    verifyCondition(condition);
 
-        conn.commit();
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    TimelineMetrics metrics = new TimelineMetrics();
 
-      } finally {
-        if (stmt != null) {
-          try {
-            stmt.close();
-          } catch (SQLException e) {
-            // Ignore
+    try {
+      //get latest
+      if(condition.isPointInTime()) {
+        stmt = getLatestAggregateMetricRecords(condition, conn, metrics);
+      } else {
+        stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+
+        ResultSet rs = stmt.executeQuery();
+        while (rs.next()) {
+          TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs);
+
+          if (condition.isGrouped()) {
+            metrics.addOrMergeTimelineMetric(metric);
+          } else {
+            metrics.getMetrics().add(metric);
           }
         }
-        if (conn != null) {
-          try {
-            conn.close();
-          } catch (SQLException sql) {
-            // Ignore
-          }
+      }
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
         }
       }
+    }
+
+    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+    return metrics;
+  }
 
-      long end = System.currentTimeMillis();
+  private PreparedStatement getLatestAggregateMetricRecords(
+    Condition condition, Connection conn, TimelineMetrics metrics)
+    throws SQLException {
 
-      if ((end - start) > 60000l) {
-        LOG.info("Time to save map: " + (end - start) + ", " +
-          "thread = " + Thread.currentThread().getClass());
+    PreparedStatement stmt = null;
+    SplitByMetricNamesCondition splitCondition =
+      new SplitByMetricNamesCondition(condition);
+
+    for (String metricName: splitCondition.getOriginalMetricNames()) {
+
+      splitCondition.setCurrentMetric(metricName);
+      stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn,
+        splitCondition);
+
+      ResultSet rs = stmt.executeQuery();
+      while (rs.next()) {
+        TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs);
+        metrics.getMetrics().add(metric);
       }
     }
+
+    return stmt;
+  }
+
+  private TimelineMetric getAggregateTimelineMetricFromResultSet(
+    ResultSet rs) throws SQLException {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(rs.getString("METRIC_NAME"));
+    metric.setAppId(rs.getString("APP_ID"));
+    metric.setInstanceId(rs.getString("INSTANCE_ID"));
+    metric.setTimestamp(rs.getLong("SERVER_TIME"));
+    metric.setStartTime(rs.getLong("SERVER_TIME"));
+    Map<Long, Double> valueMap = new HashMap<Long, Double>();
+    valueMap.put(rs.getLong("SERVER_TIME"),
+      rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
+    metric.setMetricValues(valueMap);
+
+    return metric;
+  }
+
+  private void verifyCondition(Condition condition) throws SQLException {
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
   }
 
-  /**
-   * Save Metric aggregate records.
-   *
-   * @throws SQLException
-   */
-  public void saveClusterAggregateRecords(
-    Map<TimelineClusterMetric, MetricClusterAggregate> records)
+  public void saveHostAggregateRecords(Map<TimelineMetric,
+    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
     throws SQLException {
 
-      if (records == null || records.isEmpty()) {
-        LOG.debug("Empty aggregate records.");
-        return;
-      }
+    if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
+    }
 
-      long start = System.currentTimeMillis();
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
 
-      Connection conn = getConnection();
-      PreparedStatement stmt = null;
-      try {
-        stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
-        int rowCount = 0;
+    long start = System.currentTimeMillis();
+    int rowCount = 0;
 
-        for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
-          aggregateEntry : records.entrySet()) {
-          TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-          MetricClusterAggregate aggregate = aggregateEntry.getValue();
+    try {
+      stmt = conn.prepareStatement(
+        String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
 
-          LOG.trace("clusterMetric = " + clusterMetric + ", " +
-            "aggregate = " + aggregate);
+      for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+        hostAggregateMap.entrySet()) {
 
-          rowCount++;
-          stmt.clearParameters();
-          stmt.setString(1, clusterMetric.getMetricName());
-          stmt.setString(2, clusterMetric.getAppId());
-          stmt.setString(3, clusterMetric.getInstanceId());
-          stmt.setLong(4, clusterMetric.getTimestamp());
-          stmt.setString(5, clusterMetric.getType());
-          stmt.setDouble(6, aggregate.getSum());
-          stmt.setInt(7, aggregate.getNumberOfHosts());
-          stmt.setDouble(8, aggregate.getMax());
-          stmt.setDouble(9, aggregate.getMin());
-
-          try {
-            stmt.executeUpdate();
-          } catch (SQLException sql) {
-            // TODO: Why this exception is swallowed
-            LOG.error(sql);
-          }
+        TimelineMetric metric = metricAggregate.getKey();
+        MetricHostAggregate hostAggregate = metricAggregate.getValue();
 
-          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-            conn.commit();
-            rowCount = 0;
-          }
-        }
+        rowCount++;
+        stmt.clearParameters();
+        stmt.setString(1, metric.getMetricName());
+        stmt.setString(2, metric.getHostName());
+        stmt.setString(3, metric.getAppId());
+        stmt.setString(4, metric.getInstanceId());
+        stmt.setLong(5, metric.getTimestamp());
+        stmt.setString(6, metric.getType());
+        stmt.setDouble(7, hostAggregate.getSum());
+        stmt.setDouble(8, hostAggregate.getMax());
+        stmt.setDouble(9, hostAggregate.getMin());
+        stmt.setDouble(10, hostAggregate.getNumberOfSamples());
 
-        conn.commit();
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
 
-      } finally {
-        if (stmt != null) {
-          try {
-            stmt.close();
-          } catch (SQLException e) {
-            // Ignore
-          }
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
         }
-        if (conn != null) {
-          try {
-            conn.close();
-          } catch (SQLException sql) {
-            // Ignore
-          }
+
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
         }
       }
-      long end = System.currentTimeMillis();
-      if ((end - start) > 60000l) {
-        LOG.info("Time to save: " + (end - start) + ", " +
-          "thread = " + Thread.currentThread().getName());
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
       }
     }
 
+    long end = System.currentTimeMillis();
+
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save map: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getClass());
+    }
+  }
+
   /**
    * Save Metric aggregate records.
    *
    * @throws SQLException
    */
-  public void saveClusterAggregateHourlyRecords(
-    Map<TimelineClusterMetric, MetricHostAggregate> records,
-    String tableName)
+  public void saveClusterAggregateRecords(
+    Map<TimelineClusterMetric, MetricClusterAggregate> records)
     throws SQLException {
+
     if (records == null || records.isEmpty()) {
       LOG.debug("Empty aggregate records.");
       return;
@@ -588,17 +666,18 @@ public class PhoenixHBaseAccessor {
     Connection conn = getConnection();
     PreparedStatement stmt = null;
     try {
-      stmt = conn.prepareStatement(String.format
-        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
+      stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
       int rowCount = 0;
 
-      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
+      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
         aggregateEntry : records.entrySet()) {
         TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-        MetricHostAggregate aggregate = aggregateEntry.getValue();
+        MetricClusterAggregate aggregate = aggregateEntry.getValue();
 
-        LOG.trace("clusterMetric = " + clusterMetric + ", " +
-          "aggregate = " + aggregate);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("clusterMetric = " + clusterMetric + ", " +
+            "aggregate = " + aggregate);
+        }
 
         rowCount++;
         stmt.clearParameters();
@@ -608,8 +687,7 @@ public class PhoenixHBaseAccessor {
         stmt.setLong(4, clusterMetric.getTimestamp());
         stmt.setString(5, clusterMetric.getType());
         stmt.setDouble(6, aggregate.getSum());
-//        stmt.setInt(7, aggregate.getNumberOfHosts());
-        stmt.setLong(7, aggregate.getNumberOfSamples());
+        stmt.setInt(7, aggregate.getNumberOfHosts());
         stmt.setDouble(8, aggregate.getMax());
         stmt.setDouble(9, aggregate.getMin());
 
@@ -651,48 +729,68 @@ public class PhoenixHBaseAccessor {
     }
   }
 
+
   /**
-   * Get metrics aggregated across hosts.
+   * Save Metric aggregate records.
    *
-   * @param condition @Condition
-   * @return @TimelineMetrics
    * @throws SQLException
    */
-  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
+  public void saveClusterAggregateHourlyRecords(
+    Map<TimelineClusterMetric, MetricHostAggregate> records,
+    String tableName)
     throws SQLException {
-
-    if (condition.isEmpty()) {
-      throw new SQLException("No filter criteria specified.");
+    if (records == null || records.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
     }
 
+    long start = System.currentTimeMillis();
+
     Connection conn = getConnection();
     PreparedStatement stmt = null;
-    TimelineMetrics metrics = new TimelineMetrics();
-
     try {
-      stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+      stmt = conn.prepareStatement(String.format
+        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
+      int rowCount = 0;
 
-      ResultSet rs = stmt.executeQuery();
+      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
+        aggregateEntry : records.entrySet()) {
+        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+        MetricHostAggregate aggregate = aggregateEntry.getValue();
 
-      while (rs.next()) {
-        TimelineMetric metric = new TimelineMetric();
-        metric.setMetricName(rs.getString("METRIC_NAME"));
-        metric.setAppId(rs.getString("APP_ID"));
-        metric.setInstanceId(rs.getString("INSTANCE_ID"));
-        metric.setTimestamp(rs.getLong("SERVER_TIME"));
-        metric.setStartTime(rs.getLong("SERVER_TIME"));
-        Map<Long, Double> valueMap = new HashMap<Long, Double>();
-        valueMap.put(rs.getLong("SERVER_TIME"),
-          rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
-        metric.setMetricValues(valueMap);
-
-        if (condition.isGrouped()) {
-          metrics.addOrMergeTimelineMetric(metric);
-        } else {
-          metrics.getMetrics().add(metric);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("clusterMetric = " + clusterMetric + ", " +
+            "aggregate = " + aggregate);
+        }
+
+        rowCount++;
+        stmt.clearParameters();
+        stmt.setString(1, clusterMetric.getMetricName());
+        stmt.setString(2, clusterMetric.getAppId());
+        stmt.setString(3, clusterMetric.getInstanceId());
+        stmt.setLong(4, clusterMetric.getTimestamp());
+        stmt.setString(5, clusterMetric.getType());
+        stmt.setDouble(6, aggregate.getSum());
+//        stmt.setInt(7, aggregate.getNumberOfHosts());
+        stmt.setLong(7, aggregate.getNumberOfSamples());
+        stmt.setDouble(8, aggregate.getMax());
+        stmt.setDouble(9, aggregate.getMin());
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          // we have no way to verify it works!!!
+          LOG.error(sql);
+        }
+
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
         }
       }
 
+      conn.commit();
+
     } finally {
       if (stmt != null) {
         try {
@@ -709,7 +807,10 @@ public class PhoenixHBaseAccessor {
         }
       }
     }
-    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
-    return metrics;
+    long end = System.currentTimeMillis();
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getName());
+    }
   }
 }

+ 381 - 110
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java

@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import com.sun.xml.bind.v2.util.QNameMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
@@ -32,7 +34,6 @@ import java.util.Set;
 public class PhoenixTransactSQL {
 
   static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
-  // TODO: Configurable TTL values
   /**
    * Create table to store individual metric records.
    */
@@ -206,8 +207,10 @@ public class PhoenixTransactSQL {
   public static final String DEFAULT_ENCODING = "FAST_DIFF";
   public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
 
-  /** Filter to optimize HBase scan by using file timestamps. This prevents
+  /**
+   * Filter to optimize HBase scan by using file timestamps. This prevents
    * a full table scan of metric records.
+   *
    * @return Phoenix Hint String
    */
   public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
@@ -243,33 +246,47 @@ public class PhoenixTransactSQL {
       sb.append(" LIMIT ").append(condition.getLimit());
     }
 
-    LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    }
     PreparedStatement stmt = connection.prepareStatement(sb.toString());
     int pos = 1;
     if (condition.getMetricNames() != null) {
       for (; pos <= condition.getMetricNames().size(); pos++) {
-        LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+        }
         stmt.setString(pos, condition.getMetricNames().get(pos - 1));
       }
     }
     if (condition.getHostname() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+      }
       stmt.setString(pos++, condition.getHostname());
     }
     if (condition.getAppId() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      }
       stmt.setString(pos++, condition.getAppId());
     }
     if (condition.getInstanceId() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      }
       stmt.setString(pos++, condition.getInstanceId());
     }
     if (condition.getStartTime() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
+      }
       stmt.setLong(pos++, condition.getStartTime());
     }
     if (condition.getEndTime() != null) {
-      LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
+      }
       stmt.setLong(pos, condition.getEndTime());
     }
     if (condition.getFetchSize() != null) {
@@ -280,6 +297,80 @@ public class PhoenixTransactSQL {
   }
 
 
+  public static PreparedStatement prepareGetLatestMetricSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+
+    if (condition.getMetricNames() == null
+      || condition.getMetricNames().size() == 0) {
+      throw new IllegalArgumentException("Point in time query without " +
+        "metric names not supported ");
+    }
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_METRIC_SQL,
+        "",
+        METRICS_RECORD_TABLE_NAME);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause();
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME  ");
+    }
+
+    sb.append(" LIMIT ").append(condition.getMetricNames().size());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    }
+    PreparedStatement stmt = connection.prepareStatement(sb.toString());
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      //IGNORE condition limit, set one based on number of metric names
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
+        }
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getHostname() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
+      }
+      stmt.setString(pos++, condition.getHostname());
+    }
+    if (condition.getAppId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      }
+      stmt.setString(pos++, condition.getAppId());
+    }
+    if (condition.getInstanceId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
+      }
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+
+    if (condition.getFetchSize() != null) {
+      stmt.setFetchSize(condition.getFetchSize());
+    }
+
+    return stmt;
+  }
+
   public static PreparedStatement prepareGetAggregateSqlStmt(
     Connection connection, Condition condition) throws SQLException {
 
@@ -298,7 +389,9 @@ public class PhoenixTransactSQL {
     String query = String.format(sb.toString(),
       PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
         NATIVE_TIME_RANGE_DELTA));
-    LOG.debug("SQL => " + query + ", condition => " + condition);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL => " + query + ", condition => " + condition);
+    }
     PreparedStatement stmt = connection.prepareStatement(query);
     int pos = 1;
     if (condition.getMetricNames() != null) {
@@ -323,7 +416,87 @@ public class PhoenixTransactSQL {
     return stmt;
   }
 
-  static class Condition {
+  public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+
+    if (condition.getMetricNames() == null
+      || condition.getMetricNames().size() == 0) {
+      throw new IllegalArgumentException("Point in time query without " +
+        "metric names not supported ");
+    }
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "");
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause();
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME  ");
+    }
+
+    sb.append(" LIMIT ").append(condition.getMetricNames().size());
+
+    String query = sb.toString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + query + ", condition: " + condition);
+    }
+
+    PreparedStatement stmt = connection.prepareStatement(query);
+    int pos = 1;
+    if (condition.getMetricNames() != null) {
+      for (; pos <= condition.getMetricNames().size(); pos++) {
+        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
+      }
+    }
+    if (condition.getAppId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      }
+      stmt.setString(pos++, condition.getAppId());
+    }
+    if (condition.getInstanceId() != null) {
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+
+    return stmt;
+  }
+
+  static interface Condition {
+
+    boolean isEmpty();
+
+    List<String> getMetricNames();
+    boolean isPointInTime();
+    boolean isGrouped();
+    void setStatement(String statement);
+    String getHostname();
+    String getAppId();
+    String getInstanceId();
+    String getConditionClause();
+    String getOrderByClause();
+    String getStatement();
+    Long getStartTime();
+    Long getEndTime();
+    Integer getLimit();
+    Integer getFetchSize();
+    void setFetchSize(Integer fetchSize);
+    void addOrderByColumn(String column);
+    void setNoLimit();
+  }
+
+  static class DefaultCondition implements Condition {
     List<String> metricNames;
     String hostname;
     String appId;
@@ -337,7 +510,7 @@ public class PhoenixTransactSQL {
     String statement;
     Set<String> orderByColumns = new LinkedHashSet<String>();
 
-    Condition(List<String> metricNames, String hostname, String appId,
+    DefaultCondition(List<String> metricNames, String hostname, String appId,
               String instanceId, Long startTime, Long endTime, Integer limit,
               boolean grouped) {
       this.metricNames = metricNames;
@@ -350,22 +523,22 @@ public class PhoenixTransactSQL {
       this.grouped = grouped;
     }
 
-    String getStatement() {
+    public String getStatement() {
       return statement;
     }
 
-    void setStatement(String statement) {
+    public void setStatement(String statement) {
       this.statement = statement;
     }
 
-    List<String> getMetricNames() {
+    public List<String> getMetricNames() {
       return metricNames == null || metricNames.isEmpty() ? null : metricNames;
     }
 
     String getMetricsClause() {
       StringBuilder sb = new StringBuilder("(");
       if (metricNames != null) {
-        for (String name : metricNames) {
+        for (String name : getMetricNames()) {
           if (sb.length() != 1) {
             sb.append(", ");
           }
@@ -378,61 +551,48 @@ public class PhoenixTransactSQL {
       }
     }
 
-    String getConditionClause() {
+    public String getConditionClause() {
       StringBuilder sb = new StringBuilder();
       boolean appendConjunction = false;
 
       if (getMetricNames() != null) {
+        if (appendConjunction) {
+          sb.append(" AND");
+        }
+
         sb.append("METRIC_NAME IN ");
         sb.append(getMetricsClause());
         appendConjunction = true;
       }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getHostname() != null) {
-        sb.append(" HOSTNAME = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getAppId() != null) {
-        sb.append(" APP_ID = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getInstanceId() != null) {
-        sb.append(" INSTANCE_ID = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getStartTime() != null) {
-        sb.append(" SERVER_TIME >= ?");
+
+      appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
+      appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
+      appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
+      appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
+      append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+      return sb.toString();
+    }
+
+    protected static boolean append(StringBuilder sb,
+                                     boolean appendConjunction,
+                             Object value, String str) {
+      if (value != null) {
+        if (appendConjunction) {
+          sb.append(" AND");
+        }
+
+        sb.append(str);
         appendConjunction = true;
       }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      if (getEndTime() != null) {
-        sb.append(" SERVER_TIME < ?");
-      }
-      return sb.toString();
+      return appendConjunction;
     }
 
-    String getHostname() {
+    public String getHostname() {
       return hostname == null || hostname.isEmpty() ? null : hostname;
     }
 
-    String getAppId() {
+    public String getAppId() {
       if (appId != null && !appId.isEmpty()) {
         if (!appId.equals("HOST")) {
           return appId.toLowerCase();
@@ -443,22 +603,27 @@ public class PhoenixTransactSQL {
       return null;
     }
 
-    String getInstanceId() {
+    public String getInstanceId() {
       return instanceId == null || instanceId.isEmpty() ? null : instanceId;
     }
 
     /**
      * Convert to millis.
      */
-    Long getStartTime() {
-      if (startTime < 9999999999l) {
+    public Long getStartTime() {
+      if (startTime == null) {
+        return null;
+      } else if (startTime < 9999999999l) {
         return startTime * 1000;
       } else {
         return startTime;
       }
     }
 
-    Long getEndTime() {
+    public Long getEndTime() {
+      if (endTime == null) {
+        return null;
+      }
       if (endTime < 9999999999l) {
         return endTime * 1000;
       } else {
@@ -466,22 +631,26 @@ public class PhoenixTransactSQL {
       }
     }
 
-    void setNoLimit() {
+    public void setNoLimit() {
       this.noLimit = true;
     }
 
-    Integer getLimit() {
+    public Integer getLimit() {
       if (noLimit) {
         return null;
       }
       return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
     }
 
-    boolean isGrouped() {
+    public boolean isGrouped() {
       return grouped;
     }
 
-    boolean isEmpty() {
+    public boolean isPointInTime() {
+      return getStartTime() == null && getEndTime() == null;
+    }
+
+    public boolean isEmpty() {
       return (metricNames == null || metricNames.isEmpty())
         && (hostname == null || hostname.isEmpty())
         && (appId == null || appId.isEmpty())
@@ -490,19 +659,19 @@ public class PhoenixTransactSQL {
         && endTime == null;
     }
 
-    Integer getFetchSize() {
+    public Integer getFetchSize() {
       return fetchSize;
     }
 
-    void setFetchSize(Integer fetchSize) {
+    public void setFetchSize(Integer fetchSize) {
       this.fetchSize = fetchSize;
     }
 
-    void addOrderByColumn(String column) {
+    public void addOrderByColumn(String column) {
       orderByColumns.add(column);
     }
 
-    String getOrderByClause() {
+    public String getOrderByClause() {
       String orderByStr = " ORDER BY ";
       if (!orderByColumns.isEmpty()) {
         StringBuilder sb = new StringBuilder(orderByStr);
@@ -535,70 +704,172 @@ public class PhoenixTransactSQL {
     }
   }
 
-  static class LikeCondition extends Condition {
+  static class LikeCondition extends DefaultCondition {
 
     LikeCondition(List<String> metricNames, String hostname,
-                         String appId, String instanceId, Long startTime,
-                         Long endTime, Integer limit, boolean grouped) {
+                  String appId, String instanceId, Long startTime,
+                  Long endTime, Integer limit, boolean grouped) {
       super(metricNames, hostname, appId, instanceId, startTime, endTime,
-          limit, grouped);
+        limit, grouped);
     }
 
     @Override
-    String getConditionClause() {
+    public String getConditionClause() {
       StringBuilder sb = new StringBuilder();
       boolean appendConjunction = false;
 
       if (getMetricNames() != null) {
         sb.append("(");
-        for (String name : metricNames) {
+        for (String name : getMetricNames()) {
           if (sb.length() > 1) {
             sb.append(" OR ");
           }
           sb.append("METRIC_NAME LIKE ?");
         }
+
         sb.append(")");
         appendConjunction = true;
       }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getHostname() != null) {
-        sb.append(" HOSTNAME = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getAppId() != null) {
-        sb.append(" APP_ID = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getInstanceId() != null) {
-        sb.append(" INSTANCE_ID = ?");
-        appendConjunction = true;
-      }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      appendConjunction = false;
-      if (getStartTime() != null) {
-        sb.append(" SERVER_TIME >= ?");
+
+      appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
+      appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
+      appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
+      appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
+      append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+      return sb.toString();
+    }
+  }
+
+  static class SplitByMetricNamesCondition implements Condition {
+    private final Condition adaptee;
+    private String currentMetric;
+
+    SplitByMetricNamesCondition(Condition condition){
+      this.adaptee = condition;
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return adaptee.isEmpty();
+    }
+
+    @Override
+    public List<String> getMetricNames() {
+      return Collections.singletonList(currentMetric);
+    }
+
+    @Override
+    public boolean isPointInTime() {
+      return adaptee.isPointInTime();
+    }
+
+    @Override
+    public boolean isGrouped() {
+      return adaptee.isGrouped();
+    }
+
+    @Override
+    public void setStatement(String statement) {
+      adaptee.setStatement(statement);
+    }
+
+    @Override
+    public String getHostname() {
+      return adaptee.getHostname();
+    }
+
+    @Override
+    public String getAppId() {
+      return adaptee.getAppId();
+    }
+
+    @Override
+    public String getInstanceId() {
+      return adaptee.getInstanceId();
+    }
+
+    @Override
+    public String getConditionClause() {
+      StringBuilder sb = new StringBuilder();
+      boolean appendConjunction = false;
+
+      if (getMetricNames() != null) {
+        for (String name : getMetricNames()) {
+          if (sb.length() > 1) {
+            sb.append(" OR ");
+          }
+          sb.append("METRIC_NAME = ?");
+        }
+
         appendConjunction = true;
       }
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-      if (getEndTime() != null) {
-        sb.append(" SERVER_TIME < ?");
-      }
+
+      appendConjunction = DefaultCondition.append(sb, appendConjunction,
+        getHostname(), " HOSTNAME = ?");
+      appendConjunction = DefaultCondition.append(sb, appendConjunction,
+        getAppId(), " APP_ID = ?");
+      appendConjunction = DefaultCondition.append(sb, appendConjunction,
+        getInstanceId(), " INSTANCE_ID = ?");
+      appendConjunction = DefaultCondition.append(sb, appendConjunction,
+        getStartTime(), " SERVER_TIME >= ?");
+      DefaultCondition.append(sb, appendConjunction, getEndTime(),
+        " SERVER_TIME < ?");
+
       return sb.toString();
     }
+
+    @Override
+    public String getOrderByClause() {
+      return adaptee.getOrderByClause();
+    }
+
+    @Override
+    public String getStatement() {
+      return adaptee.getStatement();
+    }
+
+    @Override
+    public Long getStartTime() {
+      return adaptee.getStartTime();
+    }
+
+    @Override
+    public Long getEndTime() {
+      return adaptee.getEndTime();
+    }
+
+    @Override
+    public Integer getLimit() {
+      return adaptee.getLimit();
+    }
+
+    @Override
+    public Integer getFetchSize() {
+      return adaptee.getFetchSize();
+    }
+
+    @Override
+    public void setFetchSize(Integer fetchSize) {
+      adaptee.setFetchSize(fetchSize);
+    }
+
+    @Override
+    public void addOrderByColumn(String column) {
+      adaptee.addOrderByColumn(column);
+    }
+
+    @Override
+    public void setNoLimit() {
+      adaptee.setNoLimit();
+    }
+
+    public List<String> getOriginalMetricNames() {
+      return adaptee.getMetricNames();
+    }
+
+    public void setCurrentMetric(String currentMetric) {
+      this.currentMetric = currentMetric;
+    }
   }
 }

+ 2 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java

@@ -27,6 +27,7 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 
 public class TimelineMetricAggregator extends AbstractTimelineAggregator {
@@ -78,7 +79,7 @@ public class TimelineMetricAggregator extends AbstractTimelineAggregator {
 
   @Override
   protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
       endTime, null, true);
     condition.setNoLimit();
     condition.setFetchSize(resultsetFetchSize);

+ 2 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java

@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
@@ -91,7 +92,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
 
   @Override
   protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
       endTime, null, true);
     condition.setNoLimit();
     condition.setFetchSize(resultsetFetchSize);

+ 2 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java

@@ -31,6 +31,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
@@ -89,7 +90,7 @@ public class TimelineMetricClusterAggregatorHourly extends
   @Override
   protected Condition prepareMetricQueryCondition(long startTime,
                                                   long endTime) {
-    Condition condition = new Condition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
       endTime, null, true);
     condition.setNoLimit();
     condition.setFetchSize(resultsetFetchSize);

+ 3 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java

@@ -38,6 +38,7 @@ import java.util.Map;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.fail;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
@@ -95,7 +96,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     boolean success = agg.doWork(startTime, endTime);
 
     //THEN
-    Condition condition = new Condition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
       endTime, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
@@ -155,7 +156,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     boolean success = agg.doWork(startTime, endTime);
 
     //THEN
-    Condition condition = new Condition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
       endTime, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));

+ 4 - 3
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java

@@ -39,6 +39,7 @@ import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -84,7 +85,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
     hdb.insertMetricRecords(metricsSent);
 
-    Condition queryCondition = new Condition(null, "local", null, null,
+    Condition queryCondition = new DefaultCondition(null, "local", null, null,
       startTime, startTime + (15 * 60 * 1000), null, false);
     TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
 
@@ -120,7 +121,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     boolean success = aggregatorMinute.doWork(startTime, endTime);
 
     //THEN
-    Condition condition = new Condition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
       endTime, null, true);
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
@@ -199,7 +200,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     assertTrue(success);
 
     //THEN
-    Condition condition = new Condition(null, null, null, null, startTime,
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
       endTime, null, true);
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),

+ 20 - 1
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java

@@ -24,12 +24,14 @@ import java.util.Arrays;
 import java.util.Collections;
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LikeCondition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
 
 public class TestPhoenixTransactSQL {
   @Test
   public void testConditionClause() throws Exception {
-    Condition condition = new Condition(
+    Condition condition = new DefaultCondition(
       Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
         1407959718L, 1407959918L, null, false);
 
@@ -41,6 +43,23 @@ public class TestPhoenixTransactSQL {
     Assert.assertEquals(expectedClause, preparedClause);
   }
 
+  @Test
+  public void testSplitByMetricNamesCondition() throws Exception {
+    Condition c = new DefaultCondition(
+      Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
+      1407959718L, 1407959918L, null, false);
+
+    SplitByMetricNamesCondition condition = new SplitByMetricNamesCondition(c);
+    condition.setCurrentMetric(c.getMetricNames().get(0));
+
+    String preparedClause = condition.getConditionClause();
+    String expectedClause = "METRIC_NAME = ? AND HOSTNAME = ? AND " +
+      "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+  }
+
   @Test
   public void testLikeConditionClause() throws Exception {
     Condition condition = new LikeCondition(

+ 42 - 34
ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java

@@ -137,8 +137,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
      */
     public Collection<Resource> populateResources() throws SystemException {
       // No open ended query support.
-      if (temporalInfo == null || temporalInfo.getStartTime() == null ||
-          temporalInfo.getEndTime() == null) {
+      if (temporalInfo != null && (temporalInfo.getStartTime() == null
+        || temporalInfo.getEndTime() == null)) {
         return Collections.emptySet();
       }
 
@@ -163,38 +163,9 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
             return Collections.emptySet();
           }
 
-          String metricsParam = getSetString(processRegexps(metrics.keySet()), -1);
-          // Reuse uriBuilder
-          uriBuilder.removeQuery();
-
-          if (metricsParam.length() > 0) {
-            uriBuilder.setParameter("metricNames", metricsParam);
-          }
-
-          if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
-            uriBuilder.setParameter("hostname", hostname);
-          }
-
-          String componentName = getComponentName(resource);
-          if (componentName != null && !componentName.isEmpty()) {
-            if (TIMELINE_APPID_MAP.containsKey(componentName)) {
-              componentName = TIMELINE_APPID_MAP.get(componentName);
-            }
-            uriBuilder.setParameter("appId", componentName);
-          }
-
-          long startTime = temporalInfo.getStartTime();
-          if (startTime != -1) {
-            uriBuilder.setParameter("startTime", String.valueOf(startTime));
-          }
-
-          long endTime = temporalInfo.getEndTime();
-          if (endTime != -1) {
-            uriBuilder.setParameter("endTime", String.valueOf(endTime));
-          }
+          String spec = getSpec(hostname, resource);
 
           BufferedReader reader = null;
-          String spec = uriBuilder.toString();
           try {
             LOG.debug("Metrics request url =" + spec);
             reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
@@ -205,8 +176,9 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
             Set<String> patterns = createPatterns(metrics.keySet());
 
             for (TimelineMetric metric : timelineMetrics.getMetrics()) {
-              if (metric.getMetricName() != null && metric.getMetricValues() != null
-                  && checkMetricName(patterns, metric.getMetricName())) {
+              if (metric.getMetricName() != null
+                && metric.getMetricValues() != null
+                && checkMetricName(patterns, metric.getMetricName())) {
                 populateResource(resource, metric);
               }
             }
@@ -230,6 +202,42 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
       return Collections.emptySet();
     }
 
+    private String getSpec(String hostname, Resource resource) {
+      String metricsParam = getSetString(processRegexps(metrics.keySet()), -1);
+      // Reuse uriBuilder
+      uriBuilder.removeQuery();
+
+      if (metricsParam.length() > 0) {
+        uriBuilder.setParameter("metricNames", metricsParam);
+      }
+
+      if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
+        uriBuilder.setParameter("hostname", hostname);
+      }
+
+      String componentName = getComponentName(resource);
+      if (componentName != null && !componentName.isEmpty()) {
+        if (TIMELINE_APPID_MAP.containsKey(componentName)) {
+          componentName = TIMELINE_APPID_MAP.get(componentName);
+        }
+        uriBuilder.setParameter("appId", componentName);
+      }
+
+      if (temporalInfo != null) {
+        long startTime = temporalInfo.getStartTime();
+        if (startTime != -1) {
+          uriBuilder.setParameter("startTime", String.valueOf(startTime));
+        }
+
+        long endTime = temporalInfo.getEndTime();
+        if (endTime != -1) {
+          uriBuilder.setParameter("endTime", String.valueOf(endTime));
+        }
+      }
+
+      return uriBuilder.toString();
+    }
+
     private Set<String> createPatterns(Set<String> rawNames) {
       Pattern pattern = Pattern.compile(METRIC_REGEXP_PATTERN);
       Set<String> result = new HashSet<String>();

+ 91 - 1
ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java

@@ -96,6 +96,95 @@ public class AMSPropertyProviderTest {
     Assert.assertEquals(111, val.length);
   }
 
+  @Test
+  public void testPopulateResourcesForSingleHostMetricPointInTime() throws
+    Exception {
+
+    // given
+    TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+    TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
+    ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
+    Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
+    AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
+      propertyIds,
+      streamProvider,
+      sslConfiguration,
+      metricHostProvider,
+      CLUSTER_NAME_PROPERTY_ID,
+      HOST_NAME_PROPERTY_ID
+    );
+
+    Resource resource = new ResourceImpl(Resource.Type.Host);
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
+    Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap();
+    Request request = PropertyHelper.getReadRequest(Collections.singleton
+      (PROPERTY_ID1), temporalInfoMap);
+    System.out.println(request);
+
+    // when
+    Set<Resource> resources =
+      propertyProvider.populateResources(Collections.singleton(resource), request, null);
+
+    // then
+    Assert.assertEquals(1, resources.size());
+    Resource res = resources.iterator().next();
+    Map<String, Object> properties = PropertyHelper.getProperties(res);
+    Assert.assertNotNull(properties);
+    URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
+    uriBuilder.addParameter("metricNames", "cpu_user");
+    uriBuilder.addParameter("hostname", "h1");
+    uriBuilder.addParameter("appId", "HOST");
+    Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
+    Double val = (Double) res.getPropertyValue(PROPERTY_ID1);
+    Assert.assertEquals(40.45, val, 0.001);
+  }
+
+  @Test
+  public void testPopulateResourcesForMultipleHostMetricscPointInTime() throws Exception {
+    TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
+    TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
+    ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
+
+    Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
+    AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
+      propertyIds,
+      streamProvider,
+      sslConfiguration,
+      metricHostProvider,
+      CLUSTER_NAME_PROPERTY_ID,
+      HOST_NAME_PROPERTY_ID
+    );
+
+    Resource resource = new ResourceImpl(Resource.Type.Host);
+    resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
+    Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap();
+    Request request = PropertyHelper.getReadRequest(
+      new HashSet<String>() {{ add(PROPERTY_ID1); add(PROPERTY_ID2); }}, temporalInfoMap);
+    Set<Resource> resources =
+      propertyProvider.populateResources(Collections.singleton(resource), request, null);
+    Assert.assertEquals(1, resources.size());
+    Resource res = resources.iterator().next();
+    Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next());
+    Assert.assertNotNull(properties);
+    URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
+    uriBuilder.addParameter("metricNames", "cpu_user,mem_free");
+    uriBuilder.addParameter("hostname", "h1");
+    uriBuilder.addParameter("appId", "HOST");
+
+    URIBuilder uriBuilder2 = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
+    uriBuilder2.addParameter("metricNames", "mem_free,cpu_user");
+    uriBuilder2.addParameter("hostname", "h1");
+    uriBuilder2.addParameter("appId", "HOST");
+    System.out.println(streamProvider.getLastSpec());
+    Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
+        || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
+    Double val1 = (Double) res.getPropertyValue(PROPERTY_ID1);
+    Assert.assertEquals(40.45, val1, 0.001);
+    Double val2 = (Double)res.getPropertyValue(PROPERTY_ID2);
+    Assert.assertEquals(2.47025664E8, val2, 0.1);
+  }
+
+
   @Test
   public void testPopulateResourcesForMultipleHostMetrics() throws Exception {
     TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
@@ -139,13 +228,14 @@ public class AMSPropertyProviderTest {
     uriBuilder2.addParameter("startTime", "1416445244701");
     uriBuilder2.addParameter("endTime", "1416445244901");
     Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
-        || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
+      || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
     Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);
     Assert.assertEquals(111, val.length);
     val = (Number[][]) res.getPropertyValue(PROPERTY_ID2);
     Assert.assertEquals(86, val.length);
   }
 
+
   @Test
   public void testPopulateResourcesForRegexpMetrics() throws Exception {
     TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH);