Selaa lähdekoodia

Revert "AMBARI-8872. Support point in time queries. Breaks dashboard graphs."

This reverts commit 9bf9034a5c2481a8b40befab8c3713dcd3b6f584.
Siddharth Wagle 10 vuotta sitten
vanhempi
commit
102b47736e
10 muutettua tiedostoa jossa 341 lisäystä ja 833 poistoa
  1. 187 286
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
  2. 110 381
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java
  3. 1 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
  4. 1 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
  5. 1 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
  6. 2 3
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
  7. 3 4
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
  8. 1 20
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
  9. 34 42
      ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
  10. 1 91
      ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java

+ 187 - 286
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java

@@ -17,6 +17,7 @@
  */
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -34,7 +35,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Statement;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -48,7 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -135,6 +134,7 @@ public class PhoenixHBaseAccessor {
     }
     }
   }
   }
 
 
+
   /**
   /**
    * Get JDBC connection to HBase store. Assumption is that the hbase
    * Get JDBC connection to HBase store. Assumption is that the hbase
    * configuration is present on the classpath and loaded by the caller into
    * configuration is present on the classpath and loaded by the caller into
@@ -148,28 +148,13 @@ public class PhoenixHBaseAccessor {
     return dataSource.getConnection();
     return dataSource.getConnection();
   }
   }
 
 
-  private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
-    throws SQLException, IOException {
-    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
-    metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
-
-    return metric;
+  public static Map readMetricFromJSON(String json) throws IOException {
+    return mapper.readValue(json, metricValuesTypeRef);
   }
   }
 
 
+  @SuppressWarnings("unchecked")
   static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
   static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
     throws SQLException, IOException {
     throws SQLException, IOException {
-    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
-    Map<Long, Double> sortedByTimeMetrics =
-      new TreeMap<Long, Double>(readMetricFromJSON(rs.getString("METRICS")));
-    metric.setMetricValues(sortedByTimeMetrics);
-    return metric;
-  }
-
-  /**
-   * Returns common part of timeline metrics record without the values.
-   */
-  private static TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) 
-    throws SQLException {
     TimelineMetric metric = new TimelineMetric();
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setMetricName(rs.getString("METRIC_NAME"));
     metric.setAppId(rs.getString("APP_ID"));
     metric.setAppId(rs.getString("APP_ID"));
@@ -178,23 +163,12 @@ public class PhoenixHBaseAccessor {
     metric.setTimestamp(rs.getLong("SERVER_TIME"));
     metric.setTimestamp(rs.getLong("SERVER_TIME"));
     metric.setStartTime(rs.getLong("START_TIME"));
     metric.setStartTime(rs.getLong("START_TIME"));
     metric.setType(rs.getString("UNITS"));
     metric.setType(rs.getString("UNITS"));
+    Map<Long, Double> sortedByTimeMetrics =
+        new TreeMap<Long, Double>((Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS")));
+    metric.setMetricValues(sortedByTimeMetrics);
     return metric;
     return metric;
   }
   }
 
 
-  private static Map<Long, Double> readLastMetricValueFromJSON(String json)
-    throws IOException {
-    Map<Long, Double> values = readMetricFromJSON(json);
-    Long lastTimeStamp = Collections.max(values.keySet());
-
-    return Collections.singletonMap(lastTimeStamp, values.get(lastTimeStamp));
-  }
-
-  @SuppressWarnings("unchecked")
-  public static Map<Long, Double>  readMetricFromJSON(String json)
-    throws IOException {
-    return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
-  }
-
   static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
   static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
     throws SQLException, IOException {
     throws SQLException, IOException {
     TimelineMetric metric = new TimelineMetric();
     TimelineMetric metric = new TimelineMetric();
@@ -343,11 +317,9 @@ public class PhoenixHBaseAccessor {
       for (TimelineMetric metric : timelineMetrics) {
       for (TimelineMetric metric : timelineMetrics) {
         metricRecordStmt.clearParameters();
         metricRecordStmt.clearParameters();
 
 
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("host: " + metric.getHostName() + ", " +
-            "metricName = " + metric.getMetricName() + ", " +
-            "values: " + metric.getMetricValues());
-        }
+        LOG.trace("host: " + metric.getHostName() + ", " +
+          "metricName = " + metric.getMetricName() + ", " +
+          "values: " + metric.getMetricValues());
         Aggregator agg = new Aggregator();
         Aggregator agg = new Aggregator();
         double[] aggregates =  agg.calculateAggregates(
         double[] aggregates =  agg.calculateAggregates(
           metric.getMetricValues());
           metric.getMetricValues());
@@ -394,32 +366,31 @@ public class PhoenixHBaseAccessor {
     }
     }
   }
   }
 
 
+
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
   public TimelineMetrics getMetricRecords(final Condition condition)
   public TimelineMetrics getMetricRecords(final Condition condition)
     throws SQLException, IOException {
     throws SQLException, IOException {
 
 
-    verifyCondition(condition);
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
 
 
     Connection conn = getConnection();
     Connection conn = getConnection();
     PreparedStatement stmt = null;
     PreparedStatement stmt = null;
     TimelineMetrics metrics = new TimelineMetrics();
     TimelineMetrics metrics = new TimelineMetrics();
 
 
     try {
     try {
-      //get latest
-      if(condition.isPointInTime()){
-        stmt = getLatestMetricRecords(condition, conn, metrics);
-      } else {
-        stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
 
 
-        ResultSet rs = stmt.executeQuery();
-        while (rs.next()) {
-          TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+      ResultSet rs = stmt.executeQuery();
 
 
-          if (condition.isGrouped()) {
-            metrics.addOrMergeTimelineMetric(metric);
-          } else {
-            metrics.getMetrics().add(metric);
-          }
+      while (rs.next()) {
+        TimelineMetric metric = getTimelineMetricFromResultSet(rs);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
         }
         }
       }
       }
 
 
@@ -439,221 +410,174 @@ public class PhoenixHBaseAccessor {
         }
         }
       }
       }
     }
     }
-
-    LOG.info("Metrics records size: " + metrics.getMetrics().size());
     return metrics;
     return metrics;
   }
   }
 
 
-  private PreparedStatement getLatestMetricRecords(
-    Condition condition, Connection conn, TimelineMetrics metrics)
-    throws SQLException, IOException {
-    PreparedStatement stmt = null;
-    SplitByMetricNamesCondition splitCondition =
-      new SplitByMetricNamesCondition(condition);
-
-    for (String metricName: splitCondition.getOriginalMetricNames()) {
-      splitCondition.setCurrentMetric(metricName);
-      stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn,
-        splitCondition);
-
-      ResultSet rs = stmt.executeQuery();
-      while (rs.next()) {
-        TimelineMetric metric = getLastTimelineMetricFromResultSet(rs);
-        metrics.getMetrics().add(metric);
-      }
-    }
+  public void saveHostAggregateRecords(Map<TimelineMetric,
+    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+    throws SQLException {
 
 
-    return stmt;
-  }
+    if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) {
+      Connection conn = getConnection();
+      PreparedStatement stmt = null;
 
 
-  /**
-   * Get metrics aggregated across hosts.
-   *
-   * @param condition @Condition
-   * @return @TimelineMetrics
-   * @throws SQLException
-   */
-  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
-    throws SQLException {
+      long start = System.currentTimeMillis();
+      int rowCount = 0;
 
 
-    verifyCondition(condition);
+      try {
+        stmt = conn.prepareStatement(
+          String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+
+        for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+          hostAggregateMap.entrySet()) {
+
+          TimelineMetric metric = metricAggregate.getKey();
+          MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+          rowCount++;
+          stmt.clearParameters();
+          stmt.setString(1, metric.getMetricName());
+          stmt.setString(2, metric.getHostName());
+          stmt.setString(3, metric.getAppId());
+          stmt.setString(4, metric.getInstanceId());
+          stmt.setLong(5, metric.getTimestamp());
+          stmt.setString(6, metric.getType());
+          stmt.setDouble(7, hostAggregate.getSum());
+          stmt.setDouble(8, hostAggregate.getMax());
+          stmt.setDouble(9, hostAggregate.getMin());
+          stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+
+          try {
+            // TODO: Why this exception is swallowed
+            stmt.executeUpdate();
+          } catch (SQLException sql) {
+            LOG.error(sql);
+          }
 
 
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    TimelineMetrics metrics = new TimelineMetrics();
+          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+            conn.commit();
+            rowCount = 0;
+          }
 
 
-    try {
-      //get latest
-      if(condition.isPointInTime()) {
-        stmt = getLatestAggregateMetricRecords(condition, conn, metrics);
-      } else {
-        stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+        }
 
 
-        ResultSet rs = stmt.executeQuery();
-        while (rs.next()) {
-          TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs);
+        conn.commit();
 
 
-          if (condition.isGrouped()) {
-            metrics.addOrMergeTimelineMetric(metric);
-          } else {
-            metrics.getMetrics().add(metric);
+      } finally {
+        if (stmt != null) {
+          try {
+            stmt.close();
+          } catch (SQLException e) {
+            // Ignore
           }
           }
         }
         }
-      }
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
+        if (conn != null) {
+          try {
+            conn.close();
+          } catch (SQLException sql) {
+            // Ignore
+          }
         }
         }
       }
       }
-    }
-
-    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
-    return metrics;
-  }
 
 
-  private PreparedStatement getLatestAggregateMetricRecords(
-    Condition condition, Connection conn, TimelineMetrics metrics)
-    throws SQLException {
+      long end = System.currentTimeMillis();
 
 
-    PreparedStatement stmt = null;
-    SplitByMetricNamesCondition splitCondition =
-      new SplitByMetricNamesCondition(condition);
-
-    for (String metricName: splitCondition.getOriginalMetricNames()) {
-
-      splitCondition.setCurrentMetric(metricName);
-      stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn,
-        splitCondition);
-
-      ResultSet rs = stmt.executeQuery();
-      while (rs.next()) {
-        TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs);
-        metrics.getMetrics().add(metric);
+      if ((end - start) > 60000l) {
+        LOG.info("Time to save map: " + (end - start) + ", " +
+          "thread = " + Thread.currentThread().getClass());
       }
       }
     }
     }
-
-    return stmt;
-  }
-
-  private TimelineMetric getAggregateTimelineMetricFromResultSet(
-    ResultSet rs) throws SQLException {
-    TimelineMetric metric = new TimelineMetric();
-    metric.setMetricName(rs.getString("METRIC_NAME"));
-    metric.setAppId(rs.getString("APP_ID"));
-    metric.setInstanceId(rs.getString("INSTANCE_ID"));
-    metric.setTimestamp(rs.getLong("SERVER_TIME"));
-    metric.setStartTime(rs.getLong("SERVER_TIME"));
-    Map<Long, Double> valueMap = Collections.singletonMap(
-      rs.getLong("SERVER_TIME"),
-      rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
-    metric.setMetricValues(valueMap);
-
-    return metric;
-  }
-
-  private void verifyCondition(Condition condition) throws SQLException {
-    if (condition.isEmpty()) {
-      throw new SQLException("No filter criteria specified.");
-    }
   }
   }
 
 
-  public void saveHostAggregateRecords(Map<TimelineMetric,
-    MetricHostAggregate> hostAggregateMap, String phoenixTableName)
+  /**
+   * Save Metric aggregate records.
+   *
+   * @throws SQLException
+   */
+  public void saveClusterAggregateRecords(
+    Map<TimelineClusterMetric, MetricClusterAggregate> records)
     throws SQLException {
     throws SQLException {
 
 
-    if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
-      LOG.debug("Empty aggregate records.");
-      return;
-    }
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-
-    long start = System.currentTimeMillis();
-    int rowCount = 0;
+      if (records == null || records.isEmpty()) {
+        LOG.debug("Empty aggregate records.");
+        return;
+      }
 
 
-    try {
-      stmt = conn.prepareStatement(
-        String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
+      long start = System.currentTimeMillis();
 
 
-      for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
-        hostAggregateMap.entrySet()) {
+      Connection conn = getConnection();
+      PreparedStatement stmt = null;
+      try {
+        stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+        int rowCount = 0;
 
 
-        TimelineMetric metric = metricAggregate.getKey();
-        MetricHostAggregate hostAggregate = metricAggregate.getValue();
+        for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+          aggregateEntry : records.entrySet()) {
+          TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+          MetricClusterAggregate aggregate = aggregateEntry.getValue();
 
 
-        rowCount++;
-        stmt.clearParameters();
-        stmt.setString(1, metric.getMetricName());
-        stmt.setString(2, metric.getHostName());
-        stmt.setString(3, metric.getAppId());
-        stmt.setString(4, metric.getInstanceId());
-        stmt.setLong(5, metric.getTimestamp());
-        stmt.setString(6, metric.getType());
-        stmt.setDouble(7, hostAggregate.getSum());
-        stmt.setDouble(8, hostAggregate.getMax());
-        stmt.setDouble(9, hostAggregate.getMin());
-        stmt.setDouble(10, hostAggregate.getNumberOfSamples());
+          LOG.trace("clusterMetric = " + clusterMetric + ", " +
+            "aggregate = " + aggregate);
 
 
-        try {
-          stmt.executeUpdate();
-        } catch (SQLException sql) {
-          LOG.error(sql);
-        }
+          rowCount++;
+          stmt.clearParameters();
+          stmt.setString(1, clusterMetric.getMetricName());
+          stmt.setString(2, clusterMetric.getAppId());
+          stmt.setString(3, clusterMetric.getInstanceId());
+          stmt.setLong(4, clusterMetric.getTimestamp());
+          stmt.setString(5, clusterMetric.getType());
+          stmt.setDouble(6, aggregate.getSum());
+          stmt.setInt(7, aggregate.getNumberOfHosts());
+          stmt.setDouble(8, aggregate.getMax());
+          stmt.setDouble(9, aggregate.getMin());
+
+          try {
+            stmt.executeUpdate();
+          } catch (SQLException sql) {
+            // TODO: Why this exception is swallowed
+            LOG.error(sql);
+          }
 
 
-        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-          conn.commit();
-          rowCount = 0;
+          if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+            conn.commit();
+            rowCount = 0;
+          }
         }
         }
 
 
-      }
-
-      conn.commit();
+        conn.commit();
 
 
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
+      } finally {
+        if (stmt != null) {
+          try {
+            stmt.close();
+          } catch (SQLException e) {
+            // Ignore
+          }
         }
         }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
+        if (conn != null) {
+          try {
+            conn.close();
+          } catch (SQLException sql) {
+            // Ignore
+          }
         }
         }
       }
       }
+      long end = System.currentTimeMillis();
+      if ((end - start) > 60000l) {
+        LOG.info("Time to save: " + (end - start) + ", " +
+          "thread = " + Thread.currentThread().getName());
+      }
     }
     }
 
 
-    long end = System.currentTimeMillis();
-
-    if ((end - start) > 60000l) {
-      LOG.info("Time to save map: " + (end - start) + ", " +
-        "thread = " + Thread.currentThread().getClass());
-    }
-  }
-
   /**
   /**
    * Save Metric aggregate records.
    * Save Metric aggregate records.
    *
    *
    * @throws SQLException
    * @throws SQLException
    */
    */
-  public void saveClusterAggregateRecords(
-    Map<TimelineClusterMetric, MetricClusterAggregate> records)
+  public void saveClusterAggregateHourlyRecords(
+    Map<TimelineClusterMetric, MetricHostAggregate> records,
+    String tableName)
     throws SQLException {
     throws SQLException {
-
     if (records == null || records.isEmpty()) {
     if (records == null || records.isEmpty()) {
       LOG.debug("Empty aggregate records.");
       LOG.debug("Empty aggregate records.");
       return;
       return;
@@ -664,18 +588,17 @@ public class PhoenixHBaseAccessor {
     Connection conn = getConnection();
     Connection conn = getConnection();
     PreparedStatement stmt = null;
     PreparedStatement stmt = null;
     try {
     try {
-      stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL);
+      stmt = conn.prepareStatement(String.format
+        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
       int rowCount = 0;
       int rowCount = 0;
 
 
-      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
         aggregateEntry : records.entrySet()) {
         aggregateEntry : records.entrySet()) {
         TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
         TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-        MetricClusterAggregate aggregate = aggregateEntry.getValue();
+        MetricHostAggregate aggregate = aggregateEntry.getValue();
 
 
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("clusterMetric = " + clusterMetric + ", " +
-            "aggregate = " + aggregate);
-        }
+        LOG.trace("clusterMetric = " + clusterMetric + ", " +
+          "aggregate = " + aggregate);
 
 
         rowCount++;
         rowCount++;
         stmt.clearParameters();
         stmt.clearParameters();
@@ -685,7 +608,8 @@ public class PhoenixHBaseAccessor {
         stmt.setLong(4, clusterMetric.getTimestamp());
         stmt.setLong(4, clusterMetric.getTimestamp());
         stmt.setString(5, clusterMetric.getType());
         stmt.setString(5, clusterMetric.getType());
         stmt.setDouble(6, aggregate.getSum());
         stmt.setDouble(6, aggregate.getSum());
-        stmt.setInt(7, aggregate.getNumberOfHosts());
+//        stmt.setInt(7, aggregate.getNumberOfHosts());
+        stmt.setLong(7, aggregate.getNumberOfSamples());
         stmt.setDouble(8, aggregate.getMax());
         stmt.setDouble(8, aggregate.getMax());
         stmt.setDouble(9, aggregate.getMin());
         stmt.setDouble(9, aggregate.getMin());
 
 
@@ -727,68 +651,48 @@ public class PhoenixHBaseAccessor {
     }
     }
   }
   }
 
 
-
   /**
   /**
-   * Save Metric aggregate records.
+   * Get metrics aggregated across hosts.
    *
    *
+   * @param condition @Condition
+   * @return @TimelineMetrics
    * @throws SQLException
    * @throws SQLException
    */
    */
-  public void saveClusterAggregateHourlyRecords(
-    Map<TimelineClusterMetric, MetricHostAggregate> records,
-    String tableName)
+  public TimelineMetrics getAggregateMetricRecords(final Condition condition)
     throws SQLException {
     throws SQLException {
-    if (records == null || records.isEmpty()) {
-      LOG.debug("Empty aggregate records.");
-      return;
-    }
 
 
-    long start = System.currentTimeMillis();
+    if (condition.isEmpty()) {
+      throw new SQLException("No filter criteria specified.");
+    }
 
 
     Connection conn = getConnection();
     Connection conn = getConnection();
     PreparedStatement stmt = null;
     PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(String.format
-        (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
-      int rowCount = 0;
-
-      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate>
-        aggregateEntry : records.entrySet()) {
-        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-        MetricHostAggregate aggregate = aggregateEntry.getValue();
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("clusterMetric = " + clusterMetric + ", " +
-            "aggregate = " + aggregate);
-        }
+    TimelineMetrics metrics = new TimelineMetrics();
 
 
-        rowCount++;
-        stmt.clearParameters();
-        stmt.setString(1, clusterMetric.getMetricName());
-        stmt.setString(2, clusterMetric.getAppId());
-        stmt.setString(3, clusterMetric.getInstanceId());
-        stmt.setLong(4, clusterMetric.getTimestamp());
-        stmt.setString(5, clusterMetric.getType());
-        stmt.setDouble(6, aggregate.getSum());
-//        stmt.setInt(7, aggregate.getNumberOfHosts());
-        stmt.setLong(7, aggregate.getNumberOfSamples());
-        stmt.setDouble(8, aggregate.getMax());
-        stmt.setDouble(9, aggregate.getMin());
+    try {
+      stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
 
 
-        try {
-          stmt.executeUpdate();
-        } catch (SQLException sql) {
-          // we have no way to verify it works!!!
-          LOG.error(sql);
-        }
+      ResultSet rs = stmt.executeQuery();
 
 
-        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-          conn.commit();
-          rowCount = 0;
+      while (rs.next()) {
+        TimelineMetric metric = new TimelineMetric();
+        metric.setMetricName(rs.getString("METRIC_NAME"));
+        metric.setAppId(rs.getString("APP_ID"));
+        metric.setInstanceId(rs.getString("INSTANCE_ID"));
+        metric.setTimestamp(rs.getLong("SERVER_TIME"));
+        metric.setStartTime(rs.getLong("SERVER_TIME"));
+        Map<Long, Double> valueMap = new HashMap<Long, Double>();
+        valueMap.put(rs.getLong("SERVER_TIME"),
+          rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"));
+        metric.setMetricValues(valueMap);
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric);
         }
         }
       }
       }
 
 
-      conn.commit();
-
     } finally {
     } finally {
       if (stmt != null) {
       if (stmt != null) {
         try {
         try {
@@ -805,10 +709,7 @@ public class PhoenixHBaseAccessor {
         }
         }
       }
       }
     }
     }
-    long end = System.currentTimeMillis();
-    if ((end - start) > 60000l) {
-      LOG.info("Time to save: " + (end - start) + ", " +
-        "thread = " + Thread.currentThread().getName());
-    }
+    LOG.info("Aggregate records size: " + metrics.getMetrics().size());
+    return metrics;
   }
   }
 }
 }

+ 110 - 381
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java

@@ -17,13 +17,11 @@
  */
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 
-import com.sun.xml.bind.v2.util.QNameMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.SQLException;
-import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Set;
 import java.util.Set;
@@ -34,6 +32,7 @@ import java.util.Set;
 public class PhoenixTransactSQL {
 public class PhoenixTransactSQL {
 
 
   static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
   static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+  // TODO: Configurable TTL values
   /**
   /**
    * Create table to store individual metric records.
    * Create table to store individual metric records.
    */
    */
@@ -207,10 +206,8 @@ public class PhoenixTransactSQL {
   public static final String DEFAULT_ENCODING = "FAST_DIFF";
   public static final String DEFAULT_ENCODING = "FAST_DIFF";
   public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
   public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes
 
 
-  /**
-   * Filter to optimize HBase scan by using file timestamps. This prevents
+  /** Filter to optimize HBase scan by using file timestamps. This prevents
    * a full table scan of metric records.
    * a full table scan of metric records.
-   *
    * @return Phoenix Hint String
    * @return Phoenix Hint String
    */
    */
   public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
   public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
@@ -246,47 +243,33 @@ public class PhoenixTransactSQL {
       sb.append(" LIMIT ").append(condition.getLimit());
       sb.append(" LIMIT ").append(condition.getLimit());
     }
     }
 
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
-    }
+    LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
     PreparedStatement stmt = connection.prepareStatement(sb.toString());
     PreparedStatement stmt = connection.prepareStatement(sb.toString());
     int pos = 1;
     int pos = 1;
     if (condition.getMetricNames() != null) {
     if (condition.getMetricNames() != null) {
       for (; pos <= condition.getMetricNames().size(); pos++) {
       for (; pos <= condition.getMetricNames().size(); pos++) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
-        }
+        LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
         stmt.setString(pos, condition.getMetricNames().get(pos - 1));
         stmt.setString(pos, condition.getMetricNames().get(pos - 1));
       }
       }
     }
     }
     if (condition.getHostname() != null) {
     if (condition.getHostname() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
-      }
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
       stmt.setString(pos++, condition.getHostname());
       stmt.setString(pos++, condition.getHostname());
     }
     }
     if (condition.getAppId() != null) {
     if (condition.getAppId() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
-      }
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
       stmt.setString(pos++, condition.getAppId());
       stmt.setString(pos++, condition.getAppId());
     }
     }
     if (condition.getInstanceId() != null) {
     if (condition.getInstanceId() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
-      }
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
       stmt.setString(pos++, condition.getInstanceId());
       stmt.setString(pos++, condition.getInstanceId());
     }
     }
     if (condition.getStartTime() != null) {
     if (condition.getStartTime() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
-      }
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime());
       stmt.setLong(pos++, condition.getStartTime());
       stmt.setLong(pos++, condition.getStartTime());
     }
     }
     if (condition.getEndTime() != null) {
     if (condition.getEndTime() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
-      }
+      LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime());
       stmt.setLong(pos, condition.getEndTime());
       stmt.setLong(pos, condition.getEndTime());
     }
     }
     if (condition.getFetchSize() != null) {
     if (condition.getFetchSize() != null) {
@@ -297,80 +280,6 @@ public class PhoenixTransactSQL {
   }
   }
 
 
 
 
-  public static PreparedStatement prepareGetLatestMetricSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
-
-    if (condition.isEmpty()) {
-      throw new IllegalArgumentException("Condition is empty.");
-    }
-
-    if (condition.getMetricNames() == null
-      || condition.getMetricNames().size() == 0) {
-      throw new IllegalArgumentException("Point in time query without " +
-        "metric names not supported ");
-    }
-
-    String stmtStr;
-    if (condition.getStatement() != null) {
-      stmtStr = condition.getStatement();
-    } else {
-      stmtStr = String.format(GET_METRIC_SQL,
-        "",
-        METRICS_RECORD_TABLE_NAME);
-    }
-
-    StringBuilder sb = new StringBuilder(stmtStr);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    String orderByClause = condition.getOrderByClause();
-    if (orderByClause != null) {
-      sb.append(orderByClause);
-    } else {
-      sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME  ");
-    }
-
-    sb.append(" LIMIT ").append(condition.getMetricNames().size());
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
-    }
-    PreparedStatement stmt = connection.prepareStatement(sb.toString());
-    int pos = 1;
-    if (condition.getMetricNames() != null) {
-      //IGNORE condition limit, set one based on number of metric names
-      for (; pos <= condition.getMetricNames().size(); pos++) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1));
-        }
-        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
-      }
-    }
-    if (condition.getHostname() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname());
-      }
-      stmt.setString(pos++, condition.getHostname());
-    }
-    if (condition.getAppId() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
-      }
-      stmt.setString(pos++, condition.getAppId());
-    }
-    if (condition.getInstanceId() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId());
-      }
-      stmt.setString(pos++, condition.getInstanceId());
-    }
-
-    if (condition.getFetchSize() != null) {
-      stmt.setFetchSize(condition.getFetchSize());
-    }
-
-    return stmt;
-  }
-
   public static PreparedStatement prepareGetAggregateSqlStmt(
   public static PreparedStatement prepareGetAggregateSqlStmt(
     Connection connection, Condition condition) throws SQLException {
     Connection connection, Condition condition) throws SQLException {
 
 
@@ -389,9 +298,7 @@ public class PhoenixTransactSQL {
     String query = String.format(sb.toString(),
     String query = String.format(sb.toString(),
       PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
       PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),
         NATIVE_TIME_RANGE_DELTA));
         NATIVE_TIME_RANGE_DELTA));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL => " + query + ", condition => " + condition);
-    }
+    LOG.debug("SQL => " + query + ", condition => " + condition);
     PreparedStatement stmt = connection.prepareStatement(query);
     PreparedStatement stmt = connection.prepareStatement(query);
     int pos = 1;
     int pos = 1;
     if (condition.getMetricNames() != null) {
     if (condition.getMetricNames() != null) {
@@ -416,87 +323,7 @@ public class PhoenixTransactSQL {
     return stmt;
     return stmt;
   }
   }
 
 
-  public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
-
-    if (condition.isEmpty()) {
-      throw new IllegalArgumentException("Condition is empty.");
-    }
-
-    if (condition.getMetricNames() == null
-      || condition.getMetricNames().size() == 0) {
-      throw new IllegalArgumentException("Point in time query without " +
-        "metric names not supported ");
-    }
-
-    String stmtStr;
-    if (condition.getStatement() != null) {
-      stmtStr = condition.getStatement();
-    } else {
-      stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "");
-    }
-
-    StringBuilder sb = new StringBuilder(stmtStr);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    String orderByClause = condition.getOrderByClause();
-    if (orderByClause != null) {
-      sb.append(orderByClause);
-    } else {
-      sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME  ");
-    }
-
-    sb.append(" LIMIT ").append(condition.getMetricNames().size());
-
-    String query = sb.toString();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL: " + query + ", condition: " + condition);
-    }
-
-    PreparedStatement stmt = connection.prepareStatement(query);
-    int pos = 1;
-    if (condition.getMetricNames() != null) {
-      for (; pos <= condition.getMetricNames().size(); pos++) {
-        stmt.setString(pos, condition.getMetricNames().get(pos - 1));
-      }
-    }
-    if (condition.getAppId() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
-      }
-      stmt.setString(pos++, condition.getAppId());
-    }
-    if (condition.getInstanceId() != null) {
-      stmt.setString(pos++, condition.getInstanceId());
-    }
-
-    return stmt;
-  }
-
-  static interface Condition {
-
-    boolean isEmpty();
-
-    List<String> getMetricNames();
-    boolean isPointInTime();
-    boolean isGrouped();
-    void setStatement(String statement);
-    String getHostname();
-    String getAppId();
-    String getInstanceId();
-    String getConditionClause();
-    String getOrderByClause();
-    String getStatement();
-    Long getStartTime();
-    Long getEndTime();
-    Integer getLimit();
-    Integer getFetchSize();
-    void setFetchSize(Integer fetchSize);
-    void addOrderByColumn(String column);
-    void setNoLimit();
-  }
-
-  static class DefaultCondition implements Condition {
+  static class Condition {
     List<String> metricNames;
     List<String> metricNames;
     String hostname;
     String hostname;
     String appId;
     String appId;
@@ -510,7 +337,7 @@ public class PhoenixTransactSQL {
     String statement;
     String statement;
     Set<String> orderByColumns = new LinkedHashSet<String>();
     Set<String> orderByColumns = new LinkedHashSet<String>();
 
 
-    DefaultCondition(List<String> metricNames, String hostname, String appId,
+    Condition(List<String> metricNames, String hostname, String appId,
               String instanceId, Long startTime, Long endTime, Integer limit,
               String instanceId, Long startTime, Long endTime, Integer limit,
               boolean grouped) {
               boolean grouped) {
       this.metricNames = metricNames;
       this.metricNames = metricNames;
@@ -523,22 +350,22 @@ public class PhoenixTransactSQL {
       this.grouped = grouped;
       this.grouped = grouped;
     }
     }
 
 
-    public String getStatement() {
+    String getStatement() {
       return statement;
       return statement;
     }
     }
 
 
-    public void setStatement(String statement) {
+    void setStatement(String statement) {
       this.statement = statement;
       this.statement = statement;
     }
     }
 
 
-    public List<String> getMetricNames() {
+    List<String> getMetricNames() {
       return metricNames == null || metricNames.isEmpty() ? null : metricNames;
       return metricNames == null || metricNames.isEmpty() ? null : metricNames;
     }
     }
 
 
     String getMetricsClause() {
     String getMetricsClause() {
       StringBuilder sb = new StringBuilder("(");
       StringBuilder sb = new StringBuilder("(");
       if (metricNames != null) {
       if (metricNames != null) {
-        for (String name : getMetricNames()) {
+        for (String name : metricNames) {
           if (sb.length() != 1) {
           if (sb.length() != 1) {
             sb.append(", ");
             sb.append(", ");
           }
           }
@@ -551,48 +378,61 @@ public class PhoenixTransactSQL {
       }
       }
     }
     }
 
 
-    public String getConditionClause() {
+    String getConditionClause() {
       StringBuilder sb = new StringBuilder();
       StringBuilder sb = new StringBuilder();
       boolean appendConjunction = false;
       boolean appendConjunction = false;
 
 
       if (getMetricNames() != null) {
       if (getMetricNames() != null) {
-        if (appendConjunction) {
-          sb.append(" AND");
-        }
-
         sb.append("METRIC_NAME IN ");
         sb.append("METRIC_NAME IN ");
         sb.append(getMetricsClause());
         sb.append(getMetricsClause());
         appendConjunction = true;
         appendConjunction = true;
       }
       }
-
-      appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
-      appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
-      appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
-      appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
-      append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
-      return sb.toString();
-    }
-
-    protected static boolean append(StringBuilder sb,
-                                     boolean appendConjunction,
-                             Object value, String str) {
-      if (value != null) {
-        if (appendConjunction) {
-          sb.append(" AND");
-        }
-
-        sb.append(str);
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getHostname() != null) {
+        sb.append(" HOSTNAME = ?");
         appendConjunction = true;
         appendConjunction = true;
       }
       }
-      return appendConjunction;
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getAppId() != null) {
+        sb.append(" APP_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getInstanceId() != null) {
+        sb.append(" INSTANCE_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getStartTime() != null) {
+        sb.append(" SERVER_TIME >= ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      if (getEndTime() != null) {
+        sb.append(" SERVER_TIME < ?");
+      }
+      return sb.toString();
     }
     }
 
 
-    public String getHostname() {
+    String getHostname() {
       return hostname == null || hostname.isEmpty() ? null : hostname;
       return hostname == null || hostname.isEmpty() ? null : hostname;
     }
     }
 
 
-    public String getAppId() {
+    String getAppId() {
       if (appId != null && !appId.isEmpty()) {
       if (appId != null && !appId.isEmpty()) {
         if (!appId.equals("HOST")) {
         if (!appId.equals("HOST")) {
           return appId.toLowerCase();
           return appId.toLowerCase();
@@ -603,27 +443,22 @@ public class PhoenixTransactSQL {
       return null;
       return null;
     }
     }
 
 
-    public String getInstanceId() {
+    String getInstanceId() {
       return instanceId == null || instanceId.isEmpty() ? null : instanceId;
       return instanceId == null || instanceId.isEmpty() ? null : instanceId;
     }
     }
 
 
     /**
     /**
      * Convert to millis.
      * Convert to millis.
      */
      */
-    public Long getStartTime() {
-      if (startTime == null) {
-        return null;
-      } else if (startTime < 9999999999l) {
+    Long getStartTime() {
+      if (startTime < 9999999999l) {
         return startTime * 1000;
         return startTime * 1000;
       } else {
       } else {
         return startTime;
         return startTime;
       }
       }
     }
     }
 
 
-    public Long getEndTime() {
-      if (endTime == null) {
-        return null;
-      }
+    Long getEndTime() {
       if (endTime < 9999999999l) {
       if (endTime < 9999999999l) {
         return endTime * 1000;
         return endTime * 1000;
       } else {
       } else {
@@ -631,26 +466,22 @@ public class PhoenixTransactSQL {
       }
       }
     }
     }
 
 
-    public void setNoLimit() {
+    void setNoLimit() {
       this.noLimit = true;
       this.noLimit = true;
     }
     }
 
 
-    public Integer getLimit() {
+    Integer getLimit() {
       if (noLimit) {
       if (noLimit) {
         return null;
         return null;
       }
       }
       return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
       return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
     }
     }
 
 
-    public boolean isGrouped() {
+    boolean isGrouped() {
       return grouped;
       return grouped;
     }
     }
 
 
-    public boolean isPointInTime() {
-      return getStartTime() == null && getEndTime() == null;
-    }
-
-    public boolean isEmpty() {
+    boolean isEmpty() {
       return (metricNames == null || metricNames.isEmpty())
       return (metricNames == null || metricNames.isEmpty())
         && (hostname == null || hostname.isEmpty())
         && (hostname == null || hostname.isEmpty())
         && (appId == null || appId.isEmpty())
         && (appId == null || appId.isEmpty())
@@ -659,19 +490,19 @@ public class PhoenixTransactSQL {
         && endTime == null;
         && endTime == null;
     }
     }
 
 
-    public Integer getFetchSize() {
+    Integer getFetchSize() {
       return fetchSize;
       return fetchSize;
     }
     }
 
 
-    public void setFetchSize(Integer fetchSize) {
+    void setFetchSize(Integer fetchSize) {
       this.fetchSize = fetchSize;
       this.fetchSize = fetchSize;
     }
     }
 
 
-    public void addOrderByColumn(String column) {
+    void addOrderByColumn(String column) {
       orderByColumns.add(column);
       orderByColumns.add(column);
     }
     }
 
 
-    public String getOrderByClause() {
+    String getOrderByClause() {
       String orderByStr = " ORDER BY ";
       String orderByStr = " ORDER BY ";
       if (!orderByColumns.isEmpty()) {
       if (!orderByColumns.isEmpty()) {
         StringBuilder sb = new StringBuilder(orderByStr);
         StringBuilder sb = new StringBuilder(orderByStr);
@@ -704,172 +535,70 @@ public class PhoenixTransactSQL {
     }
     }
   }
   }
 
 
-  static class LikeCondition extends DefaultCondition {
+  static class LikeCondition extends Condition {
 
 
     LikeCondition(List<String> metricNames, String hostname,
     LikeCondition(List<String> metricNames, String hostname,
-                  String appId, String instanceId, Long startTime,
-                  Long endTime, Integer limit, boolean grouped) {
+                         String appId, String instanceId, Long startTime,
+                         Long endTime, Integer limit, boolean grouped) {
       super(metricNames, hostname, appId, instanceId, startTime, endTime,
       super(metricNames, hostname, appId, instanceId, startTime, endTime,
-        limit, grouped);
+          limit, grouped);
     }
     }
 
 
     @Override
     @Override
-    public String getConditionClause() {
+    String getConditionClause() {
       StringBuilder sb = new StringBuilder();
       StringBuilder sb = new StringBuilder();
       boolean appendConjunction = false;
       boolean appendConjunction = false;
 
 
       if (getMetricNames() != null) {
       if (getMetricNames() != null) {
         sb.append("(");
         sb.append("(");
-        for (String name : getMetricNames()) {
+        for (String name : metricNames) {
           if (sb.length() > 1) {
           if (sb.length() > 1) {
             sb.append(" OR ");
             sb.append(" OR ");
           }
           }
           sb.append("METRIC_NAME LIKE ?");
           sb.append("METRIC_NAME LIKE ?");
         }
         }
-
         sb.append(")");
         sb.append(")");
         appendConjunction = true;
         appendConjunction = true;
       }
       }
-
-      appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
-      appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
-      appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
-      appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
-      append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
-      return sb.toString();
-    }
-  }
-
-  static class SplitByMetricNamesCondition implements Condition {
-    private final Condition adaptee;
-    private String currentMetric;
-
-    SplitByMetricNamesCondition(Condition condition){
-      this.adaptee = condition;
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return adaptee.isEmpty();
-    }
-
-    @Override
-    public List<String> getMetricNames() {
-      return Collections.singletonList(currentMetric);
-    }
-
-    @Override
-    public boolean isPointInTime() {
-      return adaptee.isPointInTime();
-    }
-
-    @Override
-    public boolean isGrouped() {
-      return adaptee.isGrouped();
-    }
-
-    @Override
-    public void setStatement(String statement) {
-      adaptee.setStatement(statement);
-    }
-
-    @Override
-    public String getHostname() {
-      return adaptee.getHostname();
-    }
-
-    @Override
-    public String getAppId() {
-      return adaptee.getAppId();
-    }
-
-    @Override
-    public String getInstanceId() {
-      return adaptee.getInstanceId();
-    }
-
-    @Override
-    public String getConditionClause() {
-      StringBuilder sb = new StringBuilder();
-      boolean appendConjunction = false;
-
-      if (getMetricNames() != null) {
-        for (String name : getMetricNames()) {
-          if (sb.length() > 1) {
-            sb.append(" OR ");
-          }
-          sb.append("METRIC_NAME = ?");
-        }
-
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getHostname() != null) {
+        sb.append(" HOSTNAME = ?");
         appendConjunction = true;
         appendConjunction = true;
       }
       }
-
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getHostname(), " HOSTNAME = ?");
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getAppId(), " APP_ID = ?");
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getInstanceId(), " INSTANCE_ID = ?");
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getStartTime(), " SERVER_TIME >= ?");
-      DefaultCondition.append(sb, appendConjunction, getEndTime(),
-        " SERVER_TIME < ?");
-
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getAppId() != null) {
+        sb.append(" APP_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getInstanceId() != null) {
+        sb.append(" INSTANCE_ID = ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      appendConjunction = false;
+      if (getStartTime() != null) {
+        sb.append(" SERVER_TIME >= ?");
+        appendConjunction = true;
+      }
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+      if (getEndTime() != null) {
+        sb.append(" SERVER_TIME < ?");
+      }
       return sb.toString();
       return sb.toString();
     }
     }
-
-    @Override
-    public String getOrderByClause() {
-      return adaptee.getOrderByClause();
-    }
-
-    @Override
-    public String getStatement() {
-      return adaptee.getStatement();
-    }
-
-    @Override
-    public Long getStartTime() {
-      return adaptee.getStartTime();
-    }
-
-    @Override
-    public Long getEndTime() {
-      return adaptee.getEndTime();
-    }
-
-    @Override
-    public Integer getLimit() {
-      return adaptee.getLimit();
-    }
-
-    @Override
-    public Integer getFetchSize() {
-      return adaptee.getFetchSize();
-    }
-
-    @Override
-    public void setFetchSize(Integer fetchSize) {
-      adaptee.setFetchSize(fetchSize);
-    }
-
-    @Override
-    public void addOrderByColumn(String column) {
-      adaptee.addOrderByColumn(column);
-    }
-
-    @Override
-    public void setNoLimit() {
-      adaptee.setNoLimit();
-    }
-
-    public List<String> getOriginalMetricNames() {
-      return adaptee.getMetricNames();
-    }
-
-    public void setCurrentMetric(String currentMetric) {
-      this.currentMetric = currentMetric;
-    }
   }
   }
 }
 }

+ 1 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java

@@ -27,7 +27,6 @@ import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 
 
 public class TimelineMetricAggregator extends AbstractTimelineAggregator {
 public class TimelineMetricAggregator extends AbstractTimelineAggregator {
@@ -79,7 +78,7 @@ public class TimelineMetricAggregator extends AbstractTimelineAggregator {
 
 
   @Override
   @Override
   protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
   protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new Condition(null, null, null, null, startTime,
       endTime, null, true);
       endTime, null, true);
     condition.setNoLimit();
     condition.setNoLimit();
     condition.setFetchSize(resultsetFetchSize);
     condition.setFetchSize(resultsetFetchSize);

+ 1 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java

@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
@@ -92,7 +91,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
 
 
   @Override
   @Override
   protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
   protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new Condition(null, null, null, null, startTime,
       endTime, null, true);
       endTime, null, true);
     condition.setNoLimit();
     condition.setNoLimit();
     condition.setFetchSize(resultsetFetchSize);
     condition.setFetchSize(resultsetFetchSize);

+ 1 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java

@@ -31,7 +31,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
@@ -90,7 +89,7 @@ public class TimelineMetricClusterAggregatorHourly extends
   @Override
   @Override
   protected Condition prepareMetricQueryCondition(long startTime,
   protected Condition prepareMetricQueryCondition(long startTime,
                                                   long endTime) {
                                                   long endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new Condition(null, null, null, null, startTime,
       endTime, null, true);
       endTime, null, true);
     condition.setNoLimit();
     condition.setNoLimit();
     condition.setFetchSize(resultsetFetchSize);
     condition.setFetchSize(resultsetFetchSize);

+ 2 - 3
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java

@@ -38,7 +38,6 @@ import java.util.Map;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.fail;
 import static junit.framework.Assert.fail;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
@@ -96,7 +95,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     boolean success = agg.doWork(startTime, endTime);
     boolean success = agg.doWork(startTime, endTime);
 
 
     //THEN
     //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new Condition(null, null, null, null, startTime,
       endTime, null, true);
       endTime, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
@@ -156,7 +155,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     boolean success = agg.doWork(startTime, endTime);
     boolean success = agg.doWork(startTime, endTime);
 
 
     //THEN
     //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new Condition(null, null, null, null, startTime,
       endTime, null, true);
       endTime, null, true);
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
     condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));

+ 3 - 4
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java

@@ -39,7 +39,6 @@ import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
 import static junit.framework.Assert.fail;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
@@ -85,7 +84,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
     TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
     hdb.insertMetricRecords(metricsSent);
     hdb.insertMetricRecords(metricsSent);
 
 
-    Condition queryCondition = new DefaultCondition(null, "local", null, null,
+    Condition queryCondition = new Condition(null, "local", null, null,
       startTime, startTime + (15 * 60 * 1000), null, false);
       startTime, startTime + (15 * 60 * 1000), null, false);
     TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
     TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
 
 
@@ -121,7 +120,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     boolean success = aggregatorMinute.doWork(startTime, endTime);
     boolean success = aggregatorMinute.doWork(startTime, endTime);
 
 
     //THEN
     //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new Condition(null, null, null, null, startTime,
       endTime, null, true);
       endTime, null, true);
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
@@ -200,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     assertTrue(success);
     assertTrue(success);
 
 
     //THEN
     //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+    Condition condition = new Condition(null, null, null, null, startTime,
       endTime, null, true);
       endTime, null, true);
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
     condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),

+ 1 - 20
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java

@@ -24,14 +24,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Collections;
 
 
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LikeCondition;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LikeCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
 
 
 public class TestPhoenixTransactSQL {
 public class TestPhoenixTransactSQL {
   @Test
   @Test
   public void testConditionClause() throws Exception {
   public void testConditionClause() throws Exception {
-    Condition condition = new DefaultCondition(
+    Condition condition = new Condition(
       Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
       Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
         1407959718L, 1407959918L, null, false);
         1407959718L, 1407959918L, null, false);
 
 
@@ -43,23 +41,6 @@ public class TestPhoenixTransactSQL {
     Assert.assertEquals(expectedClause, preparedClause);
     Assert.assertEquals(expectedClause, preparedClause);
   }
   }
 
 
-  @Test
-  public void testSplitByMetricNamesCondition() throws Exception {
-    Condition c = new DefaultCondition(
-      Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
-      1407959718L, 1407959918L, null, false);
-
-    SplitByMetricNamesCondition condition = new SplitByMetricNamesCondition(c);
-    condition.setCurrentMetric(c.getMetricNames().get(0));
-
-    String preparedClause = condition.getConditionClause();
-    String expectedClause = "METRIC_NAME = ? AND HOSTNAME = ? AND " +
-      "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?";
-
-    Assert.assertNotNull(preparedClause);
-    Assert.assertEquals(expectedClause, preparedClause);
-  }
-
   @Test
   @Test
   public void testLikeConditionClause() throws Exception {
   public void testLikeConditionClause() throws Exception {
     Condition condition = new LikeCondition(
     Condition condition = new LikeCondition(

+ 34 - 42
ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java

@@ -137,8 +137,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
      */
      */
     public Collection<Resource> populateResources() throws SystemException {
     public Collection<Resource> populateResources() throws SystemException {
       // No open ended query support.
       // No open ended query support.
-      if (temporalInfo != null && (temporalInfo.getStartTime() == null
-        || temporalInfo.getEndTime() == null)) {
+      if (temporalInfo == null || temporalInfo.getStartTime() == null ||
+          temporalInfo.getEndTime() == null) {
         return Collections.emptySet();
         return Collections.emptySet();
       }
       }
 
 
@@ -163,9 +163,38 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
             return Collections.emptySet();
             return Collections.emptySet();
           }
           }
 
 
-          String spec = getSpec(hostname, resource);
+          String metricsParam = getSetString(processRegexps(metrics.keySet()), -1);
+          // Reuse uriBuilder
+          uriBuilder.removeQuery();
+
+          if (metricsParam.length() > 0) {
+            uriBuilder.setParameter("metricNames", metricsParam);
+          }
+
+          if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
+            uriBuilder.setParameter("hostname", hostname);
+          }
+
+          String componentName = getComponentName(resource);
+          if (componentName != null && !componentName.isEmpty()) {
+            if (TIMELINE_APPID_MAP.containsKey(componentName)) {
+              componentName = TIMELINE_APPID_MAP.get(componentName);
+            }
+            uriBuilder.setParameter("appId", componentName);
+          }
+
+          long startTime = temporalInfo.getStartTime();
+          if (startTime != -1) {
+            uriBuilder.setParameter("startTime", String.valueOf(startTime));
+          }
+
+          long endTime = temporalInfo.getEndTime();
+          if (endTime != -1) {
+            uriBuilder.setParameter("endTime", String.valueOf(endTime));
+          }
 
 
           BufferedReader reader = null;
           BufferedReader reader = null;
+          String spec = uriBuilder.toString();
           try {
           try {
             LOG.debug("Metrics request url =" + spec);
             LOG.debug("Metrics request url =" + spec);
             reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
             reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
@@ -176,9 +205,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
             Set<String> patterns = createPatterns(metrics.keySet());
             Set<String> patterns = createPatterns(metrics.keySet());
 
 
             for (TimelineMetric metric : timelineMetrics.getMetrics()) {
             for (TimelineMetric metric : timelineMetrics.getMetrics()) {
-              if (metric.getMetricName() != null
-                && metric.getMetricValues() != null
-                && checkMetricName(patterns, metric.getMetricName())) {
+              if (metric.getMetricName() != null && metric.getMetricValues() != null
+                  && checkMetricName(patterns, metric.getMetricName())) {
                 populateResource(resource, metric);
                 populateResource(resource, metric);
               }
               }
             }
             }
@@ -202,42 +230,6 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider {
       return Collections.emptySet();
       return Collections.emptySet();
     }
     }
 
 
-    private String getSpec(String hostname, Resource resource) {
-      String metricsParam = getSetString(processRegexps(metrics.keySet()), -1);
-      // Reuse uriBuilder
-      uriBuilder.removeQuery();
-
-      if (metricsParam.length() > 0) {
-        uriBuilder.setParameter("metricNames", metricsParam);
-      }
-
-      if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) {
-        uriBuilder.setParameter("hostname", hostname);
-      }
-
-      String componentName = getComponentName(resource);
-      if (componentName != null && !componentName.isEmpty()) {
-        if (TIMELINE_APPID_MAP.containsKey(componentName)) {
-          componentName = TIMELINE_APPID_MAP.get(componentName);
-        }
-        uriBuilder.setParameter("appId", componentName);
-      }
-
-      if (temporalInfo != null) {
-        long startTime = temporalInfo.getStartTime();
-        if (startTime != -1) {
-          uriBuilder.setParameter("startTime", String.valueOf(startTime));
-        }
-
-        long endTime = temporalInfo.getEndTime();
-        if (endTime != -1) {
-          uriBuilder.setParameter("endTime", String.valueOf(endTime));
-        }
-      }
-
-      return uriBuilder.toString();
-    }
-
     private Set<String> createPatterns(Set<String> rawNames) {
     private Set<String> createPatterns(Set<String> rawNames) {
       Pattern pattern = Pattern.compile(METRIC_REGEXP_PATTERN);
       Pattern pattern = Pattern.compile(METRIC_REGEXP_PATTERN);
       Set<String> result = new HashSet<String>();
       Set<String> result = new HashSet<String>();

+ 1 - 91
ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java

@@ -96,95 +96,6 @@ public class AMSPropertyProviderTest {
     Assert.assertEquals(111, val.length);
     Assert.assertEquals(111, val.length);
   }
   }
 
 
-  @Test
-  public void testPopulateResourcesForSingleHostMetricPointInTime() throws
-    Exception {
-
-    // given
-    TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
-    TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
-    ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
-    Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
-    AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
-      propertyIds,
-      streamProvider,
-      sslConfiguration,
-      metricHostProvider,
-      CLUSTER_NAME_PROPERTY_ID,
-      HOST_NAME_PROPERTY_ID
-    );
-
-    Resource resource = new ResourceImpl(Resource.Type.Host);
-    resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
-    Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap();
-    Request request = PropertyHelper.getReadRequest(Collections.singleton
-      (PROPERTY_ID1), temporalInfoMap);
-    System.out.println(request);
-
-    // when
-    Set<Resource> resources =
-      propertyProvider.populateResources(Collections.singleton(resource), request, null);
-
-    // then
-    Assert.assertEquals(1, resources.size());
-    Resource res = resources.iterator().next();
-    Map<String, Object> properties = PropertyHelper.getProperties(res);
-    Assert.assertNotNull(properties);
-    URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
-    uriBuilder.addParameter("metricNames", "cpu_user");
-    uriBuilder.addParameter("hostname", "h1");
-    uriBuilder.addParameter("appId", "HOST");
-    Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec());
-    Double val = (Double) res.getPropertyValue(PROPERTY_ID1);
-    Assert.assertEquals(40.45, val, 0.001);
-  }
-
-  @Test
-  public void testPopulateResourcesForMultipleHostMetricscPointInTime() throws Exception {
-    TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
-    TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
-    ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class);
-
-    Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
-    AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider(
-      propertyIds,
-      streamProvider,
-      sslConfiguration,
-      metricHostProvider,
-      CLUSTER_NAME_PROPERTY_ID,
-      HOST_NAME_PROPERTY_ID
-    );
-
-    Resource resource = new ResourceImpl(Resource.Type.Host);
-    resource.setProperty(HOST_NAME_PROPERTY_ID, "h1");
-    Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap();
-    Request request = PropertyHelper.getReadRequest(
-      new HashSet<String>() {{ add(PROPERTY_ID1); add(PROPERTY_ID2); }}, temporalInfoMap);
-    Set<Resource> resources =
-      propertyProvider.populateResources(Collections.singleton(resource), request, null);
-    Assert.assertEquals(1, resources.size());
-    Resource res = resources.iterator().next();
-    Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next());
-    Assert.assertNotNull(properties);
-    URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
-    uriBuilder.addParameter("metricNames", "cpu_user,mem_free");
-    uriBuilder.addParameter("hostname", "h1");
-    uriBuilder.addParameter("appId", "HOST");
-
-    URIBuilder uriBuilder2 = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188);
-    uriBuilder2.addParameter("metricNames", "mem_free,cpu_user");
-    uriBuilder2.addParameter("hostname", "h1");
-    uriBuilder2.addParameter("appId", "HOST");
-    System.out.println(streamProvider.getLastSpec());
-    Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
-        || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
-    Double val1 = (Double) res.getPropertyValue(PROPERTY_ID1);
-    Assert.assertEquals(40.45, val1, 0.001);
-    Double val2 = (Double)res.getPropertyValue(PROPERTY_ID2);
-    Assert.assertEquals(2.47025664E8, val2, 0.1);
-  }
-
-
   @Test
   @Test
   public void testPopulateResourcesForMultipleHostMetrics() throws Exception {
   public void testPopulateResourcesForMultipleHostMetrics() throws Exception {
     TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
     TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
@@ -228,14 +139,13 @@ public class AMSPropertyProviderTest {
     uriBuilder2.addParameter("startTime", "1416445244701");
     uriBuilder2.addParameter("startTime", "1416445244701");
     uriBuilder2.addParameter("endTime", "1416445244901");
     uriBuilder2.addParameter("endTime", "1416445244901");
     Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
     Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec())
-      || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
+        || uriBuilder2.toString().equals(streamProvider.getLastSpec()));
     Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);
     Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1);
     Assert.assertEquals(111, val.length);
     Assert.assertEquals(111, val.length);
     val = (Number[][]) res.getPropertyValue(PROPERTY_ID2);
     val = (Number[][]) res.getPropertyValue(PROPERTY_ID2);
     Assert.assertEquals(86, val.length);
     Assert.assertEquals(86, val.length);
   }
   }
 
 
-
   @Test
   @Test
   public void testPopulateResourcesForRegexpMetrics() throws Exception {
   public void testPopulateResourcesForRegexpMetrics() throws Exception {
     TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH);
     TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH);