Explorar el Código

AMBARI-10290. Expose avaialble host metrics across hostcomponents. (swagle)

Siddharth Wagle hace 10 años
padre
commit
b93452edab
Se han modificado 35 ficheros con 933 adiciones y 563 borrados
  1. 8 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
  2. 35 26
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
  3. 6 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
  4. 9 10
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
  5. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
  6. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
  7. 8 8
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
  8. 5 6
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
  9. 4 4
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
  10. 7 7
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
  11. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java
  12. 7 5
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
  13. 6 7
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
  14. 169 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
  15. 29 17
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
  16. 13 15
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java
  17. 5 4
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
  18. 46 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
  19. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
  20. 258 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
  21. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
  22. 6 403
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
  23. 165 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
  24. 1 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
  25. 2 1
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
  26. 6 3
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
  27. 4 3
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java
  28. 3 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
  29. 68 10
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
  30. 10 8
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
  31. 17 6
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
  32. 18 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
  33. 1 0
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
  34. 4 7
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
  35. 8 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml

+ 8 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -25,6 +25,14 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -33,8 +41,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
 
 public class HBaseTimelineMetricStore extends AbstractService
     implements TimelineMetricStore {

+ 35 - 26
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java

@@ -24,6 +24,17 @@ import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -41,25 +52,6 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.ALTER_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
@@ -70,6 +62,23 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
 
 /**
  * Provides a facade over the Phoenix API to access HBase schema
@@ -77,7 +86,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 public class PhoenixHBaseAccessor {
 
   private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
-  private static final TimelineMetricReader timelineMetricReader = new TimelineMetricReader();
+  private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper();
   private final Configuration hbaseConf;
   private final Configuration metricsConf;
   private final RetryCounterFactory retryCounterFactory;
@@ -151,14 +160,14 @@ public class PhoenixHBaseAccessor {
 
   private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs)
     throws SQLException, IOException {
-    TimelineMetric metric = timelineMetricReader
+    TimelineMetric metric = TIMELINE_METRIC_READ_HELPER
       .getTimelineMetricCommonsFromResultSet(rs);
     metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
 
     return metric;
   }
 
-  static TimelineMetric getAggregatedTimelineMetricFromResultSet(
+  public static TimelineMetric getAggregatedTimelineMetricFromResultSet(
     ResultSet rs, Function f) throws SQLException, IOException {
 
     TimelineMetric metric = new TimelineMetric();
@@ -214,7 +223,7 @@ public class PhoenixHBaseAccessor {
     return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef);
   }
 
-  static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
+  public static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
     throws SQLException, IOException {
     TimelineMetric metric = new TimelineMetric();
     metric.setMetricName(rs.getString("METRIC_NAME"));
@@ -226,7 +235,7 @@ public class PhoenixHBaseAccessor {
     return metric;
   }
 
-  static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
+  public static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
     throws SQLException {
     MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
     metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
@@ -238,7 +247,7 @@ public class PhoenixHBaseAccessor {
     return metricHostAggregate;
   }
 
-  static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
+  public static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs)
     throws SQLException {
     MetricClusterAggregate agg = new MetricClusterAggregate();
     agg.setSum(rs.getDouble("METRIC_SUM"));
@@ -474,7 +483,7 @@ public class PhoenixHBaseAccessor {
     }
     else {
       TimelineMetric metric;
-      metric = timelineMetricReader.getTimelineMetricFromResultSet(rs);
+      metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
 
       if (condition.isGrouped()) {
         metrics.addOrMergeTimelineMetric(metric);

+ 6 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -128,6 +129,11 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_SERVICE_RPC_ADDRESS =
     "timeline.metrics.service.rpc.address";
 
+  public static final String CLUSTER_AGGREGATOR_APP_IDS =
+    "timeline.metrics.service.cluster.aggregator.appIds";
+
+  public static final String HOST_APP_ID = "HOST";
+
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private volatile boolean isInitialized = false;

+ 9 - 10
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java

@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
-
 import java.io.File;
 import java.io.IOException;
 import java.sql.Connection;
@@ -31,7 +33,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Date;
-
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
@@ -80,7 +81,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
   /**
    * Access relaxed for tests
    */
-  protected long runOnce(Long SLEEP_INTERVAL) {
+  public long runOnce(Long SLEEP_INTERVAL) {
     long currentTime = clock.getTime();
     long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
     long sleepTime = SLEEP_INTERVAL;
@@ -193,13 +194,12 @@ public abstract class AbstractTimelineAggregator implements Runnable {
    * @param startTime Sample start time
    * @param endTime Sample end time
    */
-  protected boolean doWork(long startTime, long endTime) {
+  public boolean doWork(long startTime, long endTime) {
     LOG.info("Start aggregation cycle @ " + new Date() + ", " +
       "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
 
     boolean success = true;
-    PhoenixTransactSQL.Condition condition =
-      prepareMetricQueryCondition(startTime, endTime);
+    Condition condition = prepareMetricQueryCondition(startTime, endTime);
 
     Connection conn = null;
     PreparedStatement stmt = null;
@@ -251,8 +251,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
     return success;
   }
 
-  protected abstract PhoenixTransactSQL.Condition
-  prepareMetricQueryCondition(long startTime, long endTime);
+  protected abstract Condition prepareMetricQueryCondition(long startTime, long endTime);
 
   protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
     throws IOException, SQLException;
@@ -265,7 +264,7 @@ public abstract class AbstractTimelineAggregator implements Runnable {
     return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
   }
 
-  protected abstract boolean isDisabled();
+  public abstract boolean isDisabled();
 
   protected abstract String getCheckpointLocation();
 }

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
 import java.util.Map;

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 /**
  * Is used to determine metrics aggregate table.

+ 8 - 8
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -52,39 +52,39 @@ public class MetricAggregate {
     this.min = min;
   }
 
-  void updateSum(Double sum) {
+  public void updateSum(Double sum) {
     this.sum += sum;
   }
 
-  void updateMax(Double max) {
+  public void updateMax(Double max) {
     if (max > this.max) {
       this.max = max;
     }
   }
 
-  void updateMin(Double min) {
+  public void updateMin(Double min) {
     if (min < this.min) {
       this.min = min;
     }
   }
 
   @JsonProperty("sum")
-  Double getSum() {
+  public Double getSum() {
     return sum;
   }
 
   @JsonProperty("deviation")
-  Double getDeviation() {
+  public Double getDeviation() {
     return deviation;
   }
 
   @JsonProperty("max")
-  Double getMax() {
+  public Double getMax() {
     return max;
   }
 
   @JsonProperty("min")
-  Double getMin() {
+  public Double getMin() {
     return min;
   }
 

+ 5 - 6
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
 import org.codehaus.jackson.annotate.JsonCreator;
@@ -31,18 +31,18 @@ public class MetricClusterAggregate extends MetricAggregate {
   public MetricClusterAggregate() {
   }
 
-  MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+  public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
                          Double max, Double min) {
     super(sum, deviation, max, min);
     this.numberOfHosts = numberOfHosts;
   }
 
   @JsonProperty("numberOfHosts")
-  int getNumberOfHosts() {
+  public int getNumberOfHosts() {
     return numberOfHosts;
   }
 
-  void updateNumberOfHosts(int count) {
+  public void updateNumberOfHosts(int count) {
     this.numberOfHosts += count;
   }
 
@@ -53,7 +53,7 @@ public class MetricClusterAggregate extends MetricAggregate {
   /**
    * Find and update min, max and avg for a minute
    */
-  void updateAggregates(MetricClusterAggregate hostAggregate) {
+  public void updateAggregates(MetricClusterAggregate hostAggregate) {
     updateMax(hostAggregate.getMax());
     updateMin(hostAggregate.getMin());
     updateSum(hostAggregate.getSum());
@@ -62,7 +62,6 @@ public class MetricClusterAggregate extends MetricAggregate {
 
   @Override
   public String toString() {
-//    MetricClusterAggregate
     return "MetricAggregate{" +
       "sum=" + sum +
       ", numberOfHosts=" + numberOfHosts +

+ 4 - 4
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
 import org.codehaus.jackson.annotate.JsonCreator;
@@ -42,11 +42,11 @@ public class MetricHostAggregate extends MetricAggregate {
   }
 
   @JsonProperty("numberOfSamples")
-  long getNumberOfSamples() {
+  public long getNumberOfSamples() {
     return numberOfSamples == 0 ? 1 : numberOfSamples;
   }
 
-  void updateNumberOfSamples(long count) {
+  public void updateNumberOfSamples(long count) {
     this.numberOfSamples += count;
   }
 
@@ -61,7 +61,7 @@ public class MetricHostAggregate extends MetricAggregate {
   /**
    * Find and update min, max and avg for a minute
    */
-  void updateAggregates(MetricHostAggregate hostAggregate) {
+  public void updateAggregates(MetricHostAggregate hostAggregate) {
     updateMax(hostAggregate.getMax());
     updateMin(hostAggregate.getMin());
     updateSum(hostAggregate.getSum());

+ 7 - 7
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 public class TimelineClusterMetric {
   private String metricName;
@@ -24,7 +24,7 @@ public class TimelineClusterMetric {
   private long timestamp;
   private String type;
 
-  TimelineClusterMetric(String metricName, String appId, String instanceId,
+  public TimelineClusterMetric(String metricName, String appId, String instanceId,
                         long timestamp, String type) {
     this.metricName = metricName;
     this.appId = appId;
@@ -33,23 +33,23 @@ public class TimelineClusterMetric {
     this.type = type;
   }
 
-  String getMetricName() {
+  public String getMetricName() {
     return metricName;
   }
 
-  String getAppId() {
+  public String getAppId() {
     return appId;
   }
 
-  String getInstanceId() {
+  public String getInstanceId() {
     return instanceId;
   }
 
-  long getTimestamp() {
+  public long getTimestamp() {
     return timestamp;
   }
 
-  String getType() { return type; }
+  public String getType() { return type; }
 
   @Override
   public boolean equals(Object o) {

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;

+ 7 - 5
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java

@@ -15,20 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
 
 public class TimelineMetricAggregator extends AbstractTimelineAggregator {
   private static final Log LOG = LogFactory.getLog
@@ -139,7 +141,7 @@ public class TimelineMetricAggregator extends AbstractTimelineAggregator {
   }
 
   @Override
-  protected boolean isDisabled() {
+  public boolean isDisabled() {
     return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
   }
 }

+ 6 - 7
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java

@@ -15,14 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
@@ -32,9 +34,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
 
-/**
- *
- */
 public class TimelineMetricAggregatorFactory {
   private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
     "timeline-metrics-host-aggregator-checkpoint";

+ 169 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java

@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+
+/**
+ * Aggregator responsible for providing app level host aggregates. This task
+ * is accomplished without doing a round trip to storage, rather
+ * TimelineMetricClusterAggregators are responsible for lifecycle of
+ * @TimelineMetricAppAggregator and provide the raw data to aggregate.
+ */
+public class TimelineMetricAppAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class);
+  // Lookup to check candidacy of an app
+  private final List<String> appIdsToAggregate;
+  // Map to lookup apps on a host
+  private Map<String, List<String>> hostedAppsMap = new HashMap<String, List<String>>();
+
+  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics;
+
+  public TimelineMetricAppAggregator(Configuration metricsConf) {
+    appIdsToAggregate = getAppIdsForHostAggregation(metricsConf);
+    LOG.info("AppIds configured for aggregation: " + appIdsToAggregate);
+  }
+
+  /**
+   * Lifecycle method to initialize aggregation cycle.
+   */
+  public void init() {
+    LOG.debug("Initializing aggregation cycle.");
+    aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+  }
+
+  /**
+   * Lifecycle method to indicate end of aggregation cycle.
+   */
+  public void cleanup() {
+    LOG.debug("Cleanup aggregated data.");
+    aggregateClusterMetrics = null;
+  }
+
+  /**
+   * Useful for resetting apps that no-longer need aggregation without restart.
+   */
+  public void destroy() {
+    LOG.debug("Cleanup aggregated data as well as in-memory state.");
+    aggregateClusterMetrics = null;
+    hostedAppsMap = new HashMap<String, List<String>>();
+  }
+
+  /**
+   * Calculate aggregates if the clusterMetric is a Host metric for recorded
+   * apps that are housed by this host.
+   *
+   * @param clusterMetric @TimelineClusterMetric Host / App metric
+   * @param hostname This is the hostname from which this clusterMetric originated.
+   * @param metricValue The metric value for this metric.
+   */
+  public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric,
+                                           String hostname, Double metricValue) {
+
+    String appId = clusterMetric.getAppId();
+    if (appId == null) {
+      return; // No real use case except tests
+    }
+
+    // If metric is a host metric and host has apps on it
+    if (appId.equalsIgnoreCase(HOST_APP_ID)) {
+      // Candidate metric, update app aggregates
+      if (hostedAppsMap.containsKey(hostname)) {
+        updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
+      }
+    } else {
+      // Build the hostedapps map if not a host metric
+      // Check app candidacy for host aggregation
+      if (appIdsToAggregate.contains(appId)) {
+        List<String> appIds = hostedAppsMap.get(hostname);
+        if (appIds == null) {
+          appIds = new ArrayList<String>();
+          hostedAppsMap.put(hostname, appIds);
+        }
+        if (!appIds.contains(appId)) {
+          appIds.add(appId);
+          LOG.info("Adding appId to hosted apps: appId = " +
+            clusterMetric.getAppId() + ", hostname = " + hostname);
+        }
+      }
+    }
+  }
+
+  /**
+   * Build a cluster app metric from a host metric
+   */
+  private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric,
+                                                 String hostname, Double metricValue) {
+
+    if (aggregateClusterMetrics == null) {
+      LOG.error("Aggregation requested without init call.");
+      return;
+    }
+
+    List<String> apps = hostedAppsMap.get(hostname);
+    for (String appId : apps) {
+      // Add a new cluster aggregate metric if none exists
+      TimelineClusterMetric appTimelineClusterMetric =
+        new TimelineClusterMetric(clusterMetric.getMetricName(),
+          appId,
+          clusterMetric.getInstanceId(),
+          clusterMetric.getTimestamp(),
+          clusterMetric.getType()
+        );
+
+      MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric);
+
+      if (clusterAggregate == null) {
+        clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue);
+        aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate);
+      } else {
+        clusterAggregate.updateSum(metricValue);
+        clusterAggregate.updateNumberOfHosts(1);
+        clusterAggregate.updateMax(metricValue);
+        clusterAggregate.updateMin(metricValue);
+      }
+
+    }
+  }
+
+  /**
+   * Return current copy of aggregated data.
+   */
+  public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() {
+    return aggregateClusterMetrics;
+  }
+
+  private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
+    String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
+    if (!StringUtils.isEmpty(appIds)) {
+      return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+    }
+    return Collections.emptyList();
+  }
+}

+ 29 - 17
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
 import org.apache.commons.io.FilenameUtils;
@@ -23,6 +23,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -31,17 +35,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
 /**
  * Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -55,8 +57,9 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
   private final Long sleepIntervalMillis;
   public final int timeSliceIntervalMillis;
   private final Integer checkpointCutOffMultiplier;
-  private TimelineMetricReader timelineMetricReader =
-    new TimelineMetricReader(true);
+  private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
+  // Aggregator to perform app-level aggregates for host metrics
+  private final TimelineMetricAppAggregator appAggregator;
 
   public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf) {
@@ -74,6 +77,8 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
       (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
     checkpointCutOffMultiplier =
       metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    appAggregator = new TimelineMetricAppAggregator(metricsConf);
   }
 
   @Override
@@ -85,11 +90,14 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
   protected void aggregate(ResultSet rs, long startTime, long endTime)
     throws SQLException, IOException {
     List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+    // Initialize app aggregates for host metrics
+    appAggregator.init();
     Map<TimelineClusterMetric, MetricClusterAggregate>
       aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
 
     LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
     hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
+    appAggregator.cleanup();
   }
 
   @Override
@@ -118,16 +126,14 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
     return timeSlices;
   }
 
-  private Map<TimelineClusterMetric, MetricClusterAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-    throws SQLException, IOException {
+  private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+      throws SQLException, IOException {
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
       new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
     // Create time slices
 
     while (rs.next()) {
-      TimelineMetric metric =
-        timelineMetricReader.getTimelineMetricFromResultSet(rs);
+      TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
 
       Map<TimelineClusterMetric, Double> clusterMetrics =
         sliceFromTimelineMetric(metric, timeSlices);
@@ -135,13 +141,14 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
       if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
         for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
             clusterMetrics.entrySet()) {
+
           TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
           Double avgValue = clusterMetricEntry.getValue();
 
+          MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+
           if (aggregate == null) {
-            aggregate = new MetricClusterAggregate(avgValue, 1, null,
-              avgValue, avgValue);
+            aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue);
             aggregateClusterMetrics.put(clusterMetric, aggregate);
           } else {
             aggregate.updateSum(avgValue);
@@ -149,9 +156,14 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
             aggregate.updateMax(avgValue);
             aggregate.updateMin(avgValue);
           }
+          // Update app level aggregates
+          appAggregator.processTimelineClusterMetric(clusterMetric,
+            metric.getHostName(), avgValue);
         }
       }
     }
+    // Add app level aggregates to save
+    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
     return aggregateClusterMetrics;
   }
 
@@ -166,7 +178,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
   }
 
   @Override
-  protected boolean isDisabled() {
+  public boolean isDisabled() {
     return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false);
   }
 

+ 13 - 15
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java

@@ -15,13 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -29,11 +32,9 @@ import java.util.HashMap;
 import java.util.Map;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
@@ -41,8 +42,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
 
-public class TimelineMetricClusterAggregatorHourly extends
-  AbstractTimelineAggregator {
+public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator {
   private static final Log LOG = LogFactory.getLog
     (TimelineMetricClusterAggregatorHourly.class);
   private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
@@ -106,8 +106,8 @@ public class TimelineMetricClusterAggregatorHourly extends
     return condition;
   }
 
-  private Map<TimelineClusterMetric, MetricHostAggregate>
-  aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
+  private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+      throws IOException, SQLException {
 
     TimelineClusterMetric existingMetric = null;
     MetricHostAggregate hostAggregate = null;
@@ -144,9 +144,7 @@ public class TimelineMetricClusterAggregatorHourly extends
     return hostAggregateMap;
   }
 
-  private void updateAggregatesFromHost(
-    MetricHostAggregate agg,
-    MetricClusterAggregate currentClusterAggregate) {
+  private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) {
     agg.updateMax(currentClusterAggregate.getMax());
     agg.updateMin(currentClusterAggregate.getMin());
     agg.updateSum(currentClusterAggregate.getSum());
@@ -169,7 +167,7 @@ public class TimelineMetricClusterAggregatorHourly extends
   }
 
   @Override
-  protected boolean isDisabled() {
+  public boolean isDisabled() {
     return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false);
   }
 

+ 5 - 4
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java

@@ -15,10 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 
 import java.io.IOException;
 import java.sql.ResultSet;
@@ -26,13 +27,13 @@ import java.sql.SQLException;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class TimelineMetricReader {
+public class TimelineMetricReadHelper {
 
   private boolean ignoreInstance = false;
 
-  public TimelineMetricReader() {}
+  public TimelineMetricReadHelper() {}
 
-  public TimelineMetricReader(boolean ignoreInstance) {
+  public TimelineMetricReadHelper(boolean ignoreInstance) {
     this.ignoreInstance = ignoreInstance;
   }
 

+ 46 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java

@@ -0,0 +1,46 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface Condition {
+  boolean isEmpty();
+
+  List<String> getMetricNames();
+  boolean isPointInTime();
+  boolean isGrouped();
+  void setStatement(String statement);
+  String getHostname();
+  Precision getPrecision();
+  void setPrecision(Precision precision);
+  String getAppId();
+  String getInstanceId();
+  StringBuilder getConditionClause();
+  String getOrderByClause(boolean asc);
+  String getStatement();
+  Long getStartTime();
+  Long getEndTime();
+  Integer getLimit();
+  Integer getFetchSize();
+  void setFetchSize(Integer fetchSize);
+  void addOrderByColumn(String column);
+  void setNoLimit();
+}

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
 
 
 import java.sql.Connection;

+ 258 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java

@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class DefaultCondition implements Condition {
+  List<String> metricNames;
+  String hostname;
+  String appId;
+  String instanceId;
+  Long startTime;
+  Long endTime;
+  Precision precision;
+  Integer limit;
+  boolean grouped;
+  boolean noLimit = false;
+  Integer fetchSize;
+  String statement;
+  Set<String> orderByColumns = new LinkedHashSet<String>();
+
+  public DefaultCondition(List<String> metricNames, String hostname, String appId,
+                   String instanceId, Long startTime, Long endTime, Precision precision,
+                   Integer limit, boolean grouped) {
+    this.metricNames = metricNames;
+    this.hostname = hostname;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.precision = precision;
+    this.limit = limit;
+    this.grouped = grouped;
+  }
+
+  public String getStatement() {
+    return statement;
+  }
+
+  public void setStatement(String statement) {
+    this.statement = statement;
+  }
+
+  public List<String> getMetricNames() {
+    return metricNames == null || metricNames.isEmpty() ? null : metricNames;
+  }
+
+  public StringBuilder getConditionClause() {
+    StringBuilder sb = new StringBuilder();
+    boolean appendConjunction = false;
+    StringBuilder metricsLike = new StringBuilder();
+    StringBuilder metricsIn = new StringBuilder();
+
+    if (getMetricNames() != null) {
+      for (String name : getMetricNames()) {
+        if (name.contains("%")) {
+          if (metricsLike.length() > 1) {
+            metricsLike.append(" OR ");
+          }
+          metricsLike.append("METRIC_NAME LIKE ?");
+        } else {
+          if (metricsIn.length() > 0) {
+            metricsIn.append(", ");
+          }
+          metricsIn.append("?");
+        }
+      }
+
+      if (metricsIn.length()>0) {
+        sb.append("(METRIC_NAME IN (");
+        sb.append(metricsIn);
+        sb.append(")");
+        appendConjunction = true;
+      }
+
+      if (metricsLike.length() > 0) {
+        if (appendConjunction) {
+          sb.append(" OR ");
+        } else {
+          sb.append("(");
+        }
+        sb.append(metricsLike);
+        appendConjunction = true;
+      }
+
+      if (appendConjunction) {
+        sb.append(")");
+      }
+    }
+
+    appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
+    appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
+    appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
+    appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
+    append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  protected static boolean append(StringBuilder sb,
+                                  boolean appendConjunction,
+                                  Object value, String str) {
+    if (value != null) {
+      if (appendConjunction) {
+        sb.append(" AND");
+      }
+
+      sb.append(str);
+      appendConjunction = true;
+    }
+    return appendConjunction;
+  }
+
+  public String getHostname() {
+    return hostname == null || hostname.isEmpty() ? null : hostname;
+  }
+
+  public Precision getPrecision() {
+    return precision;
+  }
+
+  public void setPrecision(Precision precision) {
+    this.precision = precision;
+  }
+
+  public String getAppId() {
+    if (appId != null && !appId.isEmpty()) {
+      if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) {
+        return appId.toLowerCase();
+      } else {
+        return appId;
+      }
+    }
+    return null;
+  }
+
+  public String getInstanceId() {
+    return instanceId == null || instanceId.isEmpty() ? null : instanceId;
+  }
+
+  /**
+   * Convert to millis.
+   */
+  public Long getStartTime() {
+    if (startTime == null) {
+      return null;
+    } else if (startTime < 9999999999l) {
+      return startTime * 1000;
+    } else {
+      return startTime;
+    }
+  }
+
+  public Long getEndTime() {
+    if (endTime == null) {
+      return null;
+    }
+    if (endTime < 9999999999l) {
+      return endTime * 1000;
+    } else {
+      return endTime;
+    }
+  }
+
+  public void setNoLimit() {
+    this.noLimit = true;
+  }
+
+  public Integer getLimit() {
+    if (noLimit) {
+      return null;
+    }
+    return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
+  }
+
+  public boolean isGrouped() {
+    return grouped;
+  }
+
+  public boolean isPointInTime() {
+    return getStartTime() == null && getEndTime() == null;
+  }
+
+  public boolean isEmpty() {
+    return (metricNames == null || metricNames.isEmpty())
+      && (hostname == null || hostname.isEmpty())
+      && (appId == null || appId.isEmpty())
+      && (instanceId == null || instanceId.isEmpty())
+      && startTime == null
+      && endTime == null;
+  }
+
+  public Integer getFetchSize() {
+    return fetchSize;
+  }
+
+  public void setFetchSize(Integer fetchSize) {
+    this.fetchSize = fetchSize;
+  }
+
+  public void addOrderByColumn(String column) {
+    orderByColumns.add(column);
+  }
+
+  public String getOrderByClause(boolean asc) {
+    String orderByStr = " ORDER BY ";
+    if (!orderByColumns.isEmpty()) {
+      StringBuilder sb = new StringBuilder(orderByStr);
+      for (String orderByColumn : orderByColumns) {
+        if (sb.length() != orderByStr.length()) {
+          sb.append(", ");
+        }
+        sb.append(orderByColumn);
+        if (!asc) {
+          sb.append(" DESC");
+        }
+      }
+      sb.append(" ");
+      return sb.toString();
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return "Condition{" +
+      "metricNames=" + metricNames +
+      ", hostname='" + hostname + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", startTime=" + startTime +
+      ", endTime=" + endTime +
+      ", limit=" + limit +
+      ", grouped=" + grouped +
+      ", orderBy=" + orderByColumns +
+      ", noLimit=" + noLimit +
+      '}';
+  }
+}

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
 
 
 import org.apache.commons.logging.Log;

+ 6 - 403
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java

@@ -15,10 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -33,7 +36,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class PhoenixTransactSQL {
 
-  static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+  public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+
   /**
    * Create table to store individual metric records.
    */
@@ -566,405 +570,4 @@ public class PhoenixTransactSQL {
 
     return stmt;
   }
-
-  static interface Condition {
-
-    boolean isEmpty();
-
-    List<String> getMetricNames();
-    boolean isPointInTime();
-    boolean isGrouped();
-    void setStatement(String statement);
-    String getHostname();
-    Precision getPrecision();
-    void setPrecision(Precision precision);
-    String getAppId();
-    String getInstanceId();
-    StringBuilder getConditionClause();
-    String getOrderByClause(boolean asc);
-    String getStatement();
-    Long getStartTime();
-    Long getEndTime();
-    Integer getLimit();
-    Integer getFetchSize();
-    void setFetchSize(Integer fetchSize);
-    void addOrderByColumn(String column);
-    void setNoLimit();
-  }
-
-  static class DefaultCondition implements Condition {
-    List<String> metricNames;
-    String hostname;
-    String appId;
-    String instanceId;
-    Long startTime;
-    Long endTime;
-    Precision precision;
-    Integer limit;
-    boolean grouped;
-    boolean noLimit = false;
-    Integer fetchSize;
-    String statement;
-    Set<String> orderByColumns = new LinkedHashSet<String>();
-
-    DefaultCondition(List<String> metricNames, String hostname, String appId,
-              String instanceId, Long startTime, Long endTime, Precision precision,
-              Integer limit, boolean grouped) {
-      this.metricNames = metricNames;
-      this.hostname = hostname;
-      this.appId = appId;
-      this.instanceId = instanceId;
-      this.startTime = startTime;
-      this.endTime = endTime;
-      this.precision = precision;
-      this.limit = limit;
-      this.grouped = grouped;
-    }
-
-    public String getStatement() {
-      return statement;
-    }
-
-    public void setStatement(String statement) {
-      this.statement = statement;
-    }
-
-    public List<String> getMetricNames() {
-      return metricNames == null || metricNames.isEmpty() ? null : metricNames;
-    }
-
-    public StringBuilder getConditionClause() {
-      StringBuilder sb = new StringBuilder();
-      boolean appendConjunction = false;
-      StringBuilder metricsLike = new StringBuilder();
-      StringBuilder metricsIn = new StringBuilder();
-
-      if (getMetricNames() != null) {
-        for (String name : getMetricNames()) {
-          if (name.contains("%")) {
-            if (metricsLike.length() > 1) {
-              metricsLike.append(" OR ");
-            }
-            metricsLike.append("METRIC_NAME LIKE ?");
-          } else {
-            if (metricsIn.length() > 0) {
-              metricsIn.append(", ");
-            }
-            metricsIn.append("?");
-          }
-        }
-
-        if (metricsIn.length()>0) {
-          sb.append("(METRIC_NAME IN (");
-          sb.append(metricsIn);
-          sb.append(")");
-          appendConjunction = true;
-        }
-
-        if (metricsLike.length() > 0) {
-          if (appendConjunction) {
-            sb.append(" OR ");
-          } else {
-            sb.append("(");
-          }
-          sb.append(metricsLike);
-          appendConjunction = true;
-        }
-
-        if (appendConjunction) {
-          sb.append(")");
-        }
-      }
-
-      appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?");
-      appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?");
-      appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?");
-      appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?");
-      append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
-      return sb;
-    }
-
-    protected static boolean append(StringBuilder sb,
-                                     boolean appendConjunction,
-                             Object value, String str) {
-      if (value != null) {
-        if (appendConjunction) {
-          sb.append(" AND");
-        }
-
-        sb.append(str);
-        appendConjunction = true;
-      }
-      return appendConjunction;
-    }
-
-    public String getHostname() {
-      return hostname == null || hostname.isEmpty() ? null : hostname;
-    }
-
-    public Precision getPrecision() {
-      return precision;
-    }
-
-    public void setPrecision(Precision precision) {
-      this.precision = precision;
-    }
-
-    public String getAppId() {
-      if (appId != null && !appId.isEmpty()) {
-        if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) {
-          return appId.toLowerCase();
-        } else {
-          return appId;
-        }
-      }
-      return null;
-    }
-
-    public String getInstanceId() {
-      return instanceId == null || instanceId.isEmpty() ? null : instanceId;
-    }
-
-    /**
-     * Convert to millis.
-     */
-    public Long getStartTime() {
-      if (startTime == null) {
-        return null;
-      } else if (startTime < 9999999999l) {
-        return startTime * 1000;
-      } else {
-        return startTime;
-      }
-    }
-
-    public Long getEndTime() {
-      if (endTime == null) {
-        return null;
-      }
-      if (endTime < 9999999999l) {
-        return endTime * 1000;
-      } else {
-        return endTime;
-      }
-    }
-
-    public void setNoLimit() {
-      this.noLimit = true;
-    }
-
-    public Integer getLimit() {
-      if (noLimit) {
-        return null;
-      }
-      return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
-    }
-
-    public boolean isGrouped() {
-      return grouped;
-    }
-
-    public boolean isPointInTime() {
-      return getStartTime() == null && getEndTime() == null;
-    }
-
-    public boolean isEmpty() {
-      return (metricNames == null || metricNames.isEmpty())
-        && (hostname == null || hostname.isEmpty())
-        && (appId == null || appId.isEmpty())
-        && (instanceId == null || instanceId.isEmpty())
-        && startTime == null
-        && endTime == null;
-    }
-
-    public Integer getFetchSize() {
-      return fetchSize;
-    }
-
-    public void setFetchSize(Integer fetchSize) {
-      this.fetchSize = fetchSize;
-    }
-
-    public void addOrderByColumn(String column) {
-      orderByColumns.add(column);
-    }
-
-    public String getOrderByClause(boolean asc) {
-      String orderByStr = " ORDER BY ";
-      if (!orderByColumns.isEmpty()) {
-        StringBuilder sb = new StringBuilder(orderByStr);
-        for (String orderByColumn : orderByColumns) {
-          if (sb.length() != orderByStr.length()) {
-            sb.append(", ");
-          }
-          sb.append(orderByColumn);
-          if (!asc) {
-            sb.append(" DESC");
-          }
-        }
-        sb.append(" ");
-        return sb.toString();
-      }
-      return null;
-    }
-
-    @Override
-    public String toString() {
-      return "Condition{" +
-        "metricNames=" + metricNames +
-        ", hostname='" + hostname + '\'' +
-        ", appId='" + appId + '\'' +
-        ", instanceId='" + instanceId + '\'' +
-        ", startTime=" + startTime +
-        ", endTime=" + endTime +
-        ", limit=" + limit +
-        ", grouped=" + grouped +
-        ", orderBy=" + orderByColumns +
-        ", noLimit=" + noLimit +
-        '}';
-    }
-  }
-
-  static class SplitByMetricNamesCondition implements Condition {
-    private final Condition adaptee;
-    private String currentMetric;
-
-    SplitByMetricNamesCondition(Condition condition){
-      this.adaptee = condition;
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return adaptee.isEmpty();
-    }
-
-    @Override
-    public List<String> getMetricNames() {
-      return Collections.singletonList(currentMetric);
-    }
-
-    @Override
-    public boolean isPointInTime() {
-      return adaptee.isPointInTime();
-    }
-
-    @Override
-    public boolean isGrouped() {
-      return adaptee.isGrouped();
-    }
-
-    @Override
-    public void setStatement(String statement) {
-      adaptee.setStatement(statement);
-    }
-
-    @Override
-    public String getHostname() {
-      return adaptee.getHostname();
-    }
-
-    @Override
-    public Precision getPrecision() {
-      return adaptee.getPrecision();
-    }
-
-    @Override
-    public void setPrecision(Precision precision) {
-      adaptee.setPrecision(precision);
-    }
-
-    @Override
-    public String getAppId() {
-      return adaptee.getAppId();
-    }
-
-    @Override
-    public String getInstanceId() {
-      return adaptee.getInstanceId();
-    }
-
-    @Override
-    public StringBuilder getConditionClause() {
-      StringBuilder sb = new StringBuilder();
-      boolean appendConjunction = false;
-
-      if (getMetricNames() != null) {
-        for (String name : getMetricNames()) {
-          if (sb.length() > 1) {
-            sb.append(" OR ");
-          }
-          sb.append("METRIC_NAME = ?");
-        }
-
-        appendConjunction = true;
-      }
-
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getHostname(), " HOSTNAME = ?");
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getAppId(), " APP_ID = ?");
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getInstanceId(), " INSTANCE_ID = ?");
-      appendConjunction = DefaultCondition.append(sb, appendConjunction,
-        getStartTime(), " SERVER_TIME >= ?");
-      DefaultCondition.append(sb, appendConjunction, getEndTime(),
-        " SERVER_TIME < ?");
-
-      return sb;
-    }
-
-    @Override
-    public String getOrderByClause(boolean asc) {
-      return adaptee.getOrderByClause(asc);
-    }
-
-    @Override
-    public String getStatement() {
-      return adaptee.getStatement();
-    }
-
-    @Override
-    public Long getStartTime() {
-      return adaptee.getStartTime();
-    }
-
-    @Override
-    public Long getEndTime() {
-      return adaptee.getEndTime();
-    }
-
-    @Override
-    public Integer getLimit() {
-      return adaptee.getLimit();
-    }
-
-    @Override
-    public Integer getFetchSize() {
-      return adaptee.getFetchSize();
-    }
-
-    @Override
-    public void setFetchSize(Integer fetchSize) {
-      adaptee.setFetchSize(fetchSize);
-    }
-
-    @Override
-    public void addOrderByColumn(String column) {
-      adaptee.addOrderByColumn(column);
-    }
-
-    @Override
-    public void setNoLimit() {
-      adaptee.setNoLimit();
-    }
-
-    public List<String> getOriginalMetricNames() {
-      return adaptee.getMetricNames();
-    }
-
-    public void setCurrentMetric(String currentMetric) {
-      this.currentMetric = currentMetric;
-    }
-  }
 }

+ 165 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java

@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.Collections;
+import java.util.List;
+
+public class SplitByMetricNamesCondition implements Condition {
+  private final Condition adaptee;
+  private String currentMetric;
+
+  public SplitByMetricNamesCondition(Condition condition){
+    this.adaptee = condition;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return adaptee.isEmpty();
+  }
+
+  @Override
+  public List<String> getMetricNames() {
+    return Collections.singletonList(currentMetric);
+  }
+
+  @Override
+  public boolean isPointInTime() {
+    return adaptee.isPointInTime();
+  }
+
+  @Override
+  public boolean isGrouped() {
+    return adaptee.isGrouped();
+  }
+
+  @Override
+  public void setStatement(String statement) {
+    adaptee.setStatement(statement);
+  }
+
+  @Override
+  public String getHostname() {
+    return adaptee.getHostname();
+  }
+
+  @Override
+  public Precision getPrecision() {
+    return adaptee.getPrecision();
+  }
+
+  @Override
+  public void setPrecision(Precision precision) {
+    adaptee.setPrecision(precision);
+  }
+
+  @Override
+  public String getAppId() {
+    return adaptee.getAppId();
+  }
+
+  @Override
+  public String getInstanceId() {
+    return adaptee.getInstanceId();
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+    StringBuilder sb = new StringBuilder();
+    boolean appendConjunction = false;
+
+    if (getMetricNames() != null) {
+      for (String name : getMetricNames()) {
+        if (sb.length() > 1) {
+          sb.append(" OR ");
+        }
+        sb.append("METRIC_NAME = ?");
+      }
+
+      appendConjunction = true;
+    }
+
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getHostname(), " HOSTNAME = ?");
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getAppId(), " APP_ID = ?");
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getInstanceId(), " INSTANCE_ID = ?");
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getStartTime(), " SERVER_TIME >= ?");
+    DefaultCondition.append(sb, appendConjunction, getEndTime(),
+      " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  @Override
+  public String getOrderByClause(boolean asc) {
+    return adaptee.getOrderByClause(asc);
+  }
+
+  @Override
+  public String getStatement() {
+    return adaptee.getStatement();
+  }
+
+  @Override
+  public Long getStartTime() {
+    return adaptee.getStartTime();
+  }
+
+  @Override
+  public Long getEndTime() {
+    return adaptee.getEndTime();
+  }
+
+  @Override
+  public Integer getLimit() {
+    return adaptee.getLimit();
+  }
+
+  @Override
+  public Integer getFetchSize() {
+    return adaptee.getFetchSize();
+  }
+
+  @Override
+  public void setFetchSize(Integer fetchSize) {
+    adaptee.setFetchSize(fetchSize);
+  }
+
+  @Override
+  public void addOrderByColumn(String column) {
+    adaptee.addOrderByColumn(column);
+  }
+
+  @Override
+  public void setNoLimit() {
+    adaptee.setNoLimit();
+  }
+
+  public List<String> getOriginalMetricNames() {
+    return adaptee.getMetricNames();
+  }
+
+  public void setCurrentMetric(String currentMetric) {
+    this.currentMetric = currentMetric;
+  }
+}

+ 1 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java

@@ -24,8 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
-  .timeline.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline.PhoenixHBaseAccessor;
 import org.apache.zookeeper.ClientCnxn;

+ 2 - 1
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
@@ -37,7 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.assertj.core.api.Assertions.assertThat;
 

+ 6 - 3
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java

@@ -18,6 +18,9 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.util.Clock;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +62,7 @@ public class AbstractTimelineAggregatorTest {
     agg = new AbstractTimelineAggregator(
       null, metricsConf, clock) {
       @Override
-      protected boolean doWork(long startTime, long endTime) {
+      public boolean doWork(long startTime, long endTime) {
         startTimeInDoWork.set(startTime);
         endTimeInDoWork.set(endTime);
         actualRuns++;
@@ -68,7 +71,7 @@ public class AbstractTimelineAggregatorTest {
       }
 
       @Override
-      protected PhoenixTransactSQL.Condition
+      protected Condition
       prepareMetricQueryCondition(long startTime, long endTime) {
         return null;
       }
@@ -89,7 +92,7 @@ public class AbstractTimelineAggregatorTest {
       }
 
       @Override
-      protected boolean isDisabled() {
+      public boolean isDisabled() {
         return false;
       }
 

+ 4 - 3
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java

@@ -17,11 +17,12 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.junit.Test;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.fromMetricName;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.ReadFunction.AVG;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.PostProcessingFunction.RATE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.fromMetricName;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class FunctionTest {

+ 3 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java

@@ -17,14 +17,15 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.ReadFunction.AVG;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.PostProcessingFunction.RATE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE;
 import static org.assertj.core.api.Assertions.*;
 
 public class HBaseTimelineMetricStoreTest {

+ 68 - 10
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java

@@ -19,27 +19,35 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetricReader;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-
 import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.fail;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
 public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   private Connection conn;
@@ -126,8 +134,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
 
   @Test
-  public void testShouldAggregateClusterIgnoringInstance() throws
-    Exception {
+  public void testShouldAggregateClusterIgnoringInstance() throws Exception {
     // GIVEN
     TimelineMetricClusterAggregator agg =
       new TimelineMetricClusterAggregator(hdb, new Configuration());
@@ -370,6 +377,57 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     assertEquals("Two hourly aggregated row expected ", 2, count);
   }
 
+  @Test
+  public void testAppLevelHostMetricAggregates() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
+    TimelineMetricClusterAggregator agg = new TimelineMetricClusterAggregator(hdb, conf);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric((ctime), "local1",
+      "app1", null, "app_metric_random", 1));
+    ctime += 10;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+      "cpu_user", 1));
+    ctime += 10;
+    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+      "cpu_user", 2));
+
+    // WHEN
+    long endTime = ctime + minute;
+    boolean success = agg.doWork(startTime, endTime);
+
+    //THEN
+    Condition condition = new DefaultCondition(
+      Collections.singletonList("cpu_user"), null, "app1", null,
+      startTime, endTime, null, null, true);
+    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+      (conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    int recordCount = 0;
+    TimelineClusterMetric currentMetric = null;
+    MetricClusterAggregate currentHostAggregate = null;
+    while (rs.next()) {
+      currentMetric = metricReader.fromResultSet(rs);
+      currentHostAggregate = PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+      recordCount++;
+    }
+    Assert.assertEquals(4, recordCount);
+    assertNotNull(currentMetric);
+    assertEquals("cpu_user", currentMetric.getMetricName());
+    assertEquals("app1", currentMetric.getAppId());
+    assertNotNull(currentHostAggregate);
+    assertEquals(1, currentHostAggregate.getNumberOfHosts());
+    assertEquals(1.0d, currentHostAggregate.getSum());
+  }
+
   private ResultSet executeQuery(String query) throws SQLException {
     Connection conn = getConnection(getUrl());
     Statement stmt = conn.createStatement();

+ 10 - 8
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java

@@ -20,10 +20,15 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -32,16 +37,13 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
-
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
 

+ 17 - 6
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java

@@ -20,6 +20,17 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,7 +47,7 @@ import java.util.Map;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
@@ -90,7 +101,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
 
     // WHEN
     long endTime = ctime + minute;
-    PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
+    Condition condition = new DefaultCondition(
         Collections.singletonList("disk_free"), "local1", null, null, startTime,
         endTime, Precision.SECONDS, null, true);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
@@ -125,7 +136,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     assertTrue(success);
 
     // WHEN
-    PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
+    Condition condition = new DefaultCondition(
         Collections.singletonList("disk_free"), "local1", null, null, startTime,
         endTime, Precision.MINUTES, null, false);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
@@ -176,7 +187,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     assertTrue(success);
 
     // WHEN
-    PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
+    Condition condition = new DefaultCondition(
         Collections.singletonList("disk_used"), "test_host", "test_app", null,
         startTime, endTime, Precision.HOURS, null, true);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
@@ -217,7 +228,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     assertTrue(success);
 
     // WHEN
-    PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
+    Condition condition = new DefaultCondition(
         Collections.singletonList("disk_free"), null, null, null,
         startTime, endTime, Precision.SECONDS, null, true);
     TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
@@ -259,7 +270,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     assertTrue(success);
 
     // WHEN
-    PhoenixTransactSQL.Condition condition = new PhoenixTransactSQL.DefaultCondition(
+    Condition condition = new DefaultCondition(
         Collections.singletonList("disk_used"), null, null, null,
         startTime, ctime + minute, Precision.HOURS, null, true);
     TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,

+ 18 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -53,7 +55,20 @@ public class MetricTestHelper {
                                                         double val) {
     TimelineMetrics m = new TimelineMetrics();
     m.setMetrics(Arrays.asList(
-        createTimelineMetric(startTime, metricName, host, instanceId, val)));
+        createTimelineMetric(startTime, metricName, host, null, instanceId, val)));
+
+    return m;
+  }
+
+  public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
+                                                            String host,
+                                                            String appId,
+                                                            String instanceId,
+                                                            String metricName,
+                                                            double val) {
+    TimelineMetrics m = new TimelineMetrics();
+    m.setMetrics(Arrays.asList(
+      createTimelineMetric(startTime, metricName, host, appId, instanceId, val)));
 
     return m;
   }
@@ -62,11 +77,12 @@ public class MetricTestHelper {
   public static TimelineMetric createTimelineMetric(long startTime,
                                                 String metricName,
                                                 String host,
+                                                String appId,
                                                 String instanceId,
                                                 double val) {
     TimelineMetric m = new TimelineMetric();
-    m.setAppId("host");
     m.setHostName(host);
+    m.setAppId(appId != null ? appId : "host");
     m.setInstanceId(instanceId);
     m.setMetricName(metricName);
     m.setStartTime(startTime);

+ 1 - 0
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline;
 
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;

+ 4 - 7
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java

@@ -17,25 +17,22 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
 import org.easymock.Capture;
 import org.junit.Assert;
 import org.junit.Test;
-
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
-
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition;
-
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
-
 import org.easymock.EasyMock;
 
 public class TestPhoenixTransactSQL {

+ 8 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml

@@ -248,4 +248,12 @@
       different mount point from the one for hbase.rootdir in embedded mode.
     </description>
   </property>
+  <property>
+    <name>timeline.metrics.service.cluster.aggregator.appIds</name>
+    <value>datanode,nodemanager,hbase,nimbus</value>
+    <description>
+      List of application ids to use for aggregating host level metrics for
+      an application. Example: bytes_read across Yarn Nodemanagers.
+    </description>
+  </property>
 </configuration>