Browse Source

AMBARI-12983. Optimize aggregator queries by performing GROUP BY on server. (swagle)

Siddharth Wagle 10 năm trước cách đây
mục cha
commit
3e0b8f07c3
20 tập tin đã thay đổi với 736 bổ sung126 xóa
  1. 1 1
      ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector
  2. 1 1
      ambari-metrics/ambari-metrics-timelineservice/pom.xml
  3. 1 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java
  4. 59 48
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
  5. 13 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
  6. 9 7
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
  7. 71 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
  8. 100 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
  9. 73 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
  10. 1 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java
  11. 5 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
  12. 139 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
  13. 74 47
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
  14. 5 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
  15. 77 8
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
  16. 77 7
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
  17. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
  18. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
  19. 20 1
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-hbase-site.xml
  20. 8 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/conf/unix/ambari-metrics-collector

@@ -176,7 +176,7 @@ function start()
     rm -f "${PIDFILE}" >/dev/null 2>&1
   fi
 
-  nohup "${JAVA}" "-Xmx$AMS_COLLECTOR_HEAPSIZE" "${AMS_COLLECTOR_OPTS}" "-cp" "/usr/lib/ambari-metrics-collector/*:${COLLECTOR_CONF_DIR}" "-Djava.net.preferIPv4Stack=true" "-Dams.log.dir=${AMS_COLLECTOR_LOG_DIR}" "-Dproc_${DAEMON_NAME}" "${CLASS}" "$@" > $OUTFILE 2>&1 &
+  nohup "${JAVA}" "-Xms$AMS_COLLECTOR_HEAPSIZE" "-Xmx$AMS_COLLECTOR_HEAPSIZE" "${AMS_COLLECTOR_OPTS}" "-cp" "/usr/lib/ambari-metrics-collector/*:${COLLECTOR_CONF_DIR}" "-Djava.net.preferIPv4Stack=true" "-Dams.log.dir=${AMS_COLLECTOR_LOG_DIR}" "-Dproc_${DAEMON_NAME}" "${CLASS}" "$@" > $OUTFILE 2>&1 &
   PID=$!
   write_pidfile "${PIDFILE}"
   sleep 2

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/pom.xml

@@ -35,7 +35,7 @@
     <!--<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>-->
     <protobuf.version>2.5.0</protobuf.version>
     <hadoop.version>(2.6.0.2.2.0.0, 2.6.0.2.2.1.0)</hadoop.version>
-    <phoenix.version>4.2.0.2.2.0.0-2041</phoenix.version>
+    <phoenix.version>4.2.0.2.2.1.0-2340</phoenix.version>
     <hbase.version>0.98.4.2.2.0.0-2041-hadoop2</hbase.version>
   </properties>
 

+ 1 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryClientService.java

@@ -90,9 +90,7 @@ public class ApplicationHistoryClientService extends AbstractService {
 
     server =
         rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
-          address, conf, null, conf.getInt(
-            YarnConfiguration.TIMELINE_SERVICE_HANDLER_THREAD_COUNT,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_THREAD_COUNT));
+          address, conf, null, metricConfiguration.getTimelineMetricsServiceHandlerThreadCount());
 
     server.start();
     this.bindAddress =

+ 59 - 48
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -40,12 +40,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-public class HBaseTimelineMetricStore extends AbstractService
-    implements TimelineMetricStore {
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+
+public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
 
   static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
   private final TimelineMetricConfiguration configuration;
   private PhoenixHBaseAccessor hBaseAccessor;
+  private static volatile boolean isInitialized = false;
 
   /**
    * Construct the service.
@@ -62,58 +64,67 @@ public class HBaseTimelineMetricStore extends AbstractService
     initializeSubsystem(configuration.getHbaseConf(), configuration.getMetricsConf());
   }
 
-  private void initializeSubsystem(Configuration hbaseConf,
-                                   Configuration metricsConf) {
-    hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
-    hBaseAccessor.initMetricSchema();
-
-    // Start the cluster aggregator minute
-    TimelineMetricAggregator minuteClusterAggregator =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
-    if (!minuteClusterAggregator.isDisabled()) {
-      Thread aggregatorThread = new Thread(minuteClusterAggregator);
-      aggregatorThread.start();
-    }
+  private synchronized void initializeSubsystem(Configuration hbaseConf,
+                                                Configuration metricsConf) {
+    if (!isInitialized) {
+      hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
+      hBaseAccessor.initMetricSchema();
 
-    // Start the hourly cluster aggregator
-    TimelineMetricAggregator hourlyClusterAggregator =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
-    if (!hourlyClusterAggregator.isDisabled()) {
-      Thread aggregatorThread = new Thread(hourlyClusterAggregator);
-      aggregatorThread.start();
-    }
+      if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
+        LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
+      }
 
-    // Start the daily cluster aggregator
-    TimelineMetricAggregator dailyClusterAggregator =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
-    if (!dailyClusterAggregator.isDisabled()) {
-      Thread aggregatorThread = new Thread(dailyClusterAggregator);
-      aggregatorThread.start();
-    }
+      // Start the cluster aggregator minute
+      TimelineMetricAggregator minuteClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
+      if (!minuteClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(minuteClusterAggregator);
+        aggregatorThread.start();
+      }
 
-    // Start the minute host aggregator
-    TimelineMetricAggregator minuteHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
-    if (!minuteHostAggregator.isDisabled()) {
-      Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
-      minuteAggregatorThread.start();
-    }
+      // Start the hourly cluster aggregator
+      TimelineMetricAggregator hourlyClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
+      if (!hourlyClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(hourlyClusterAggregator);
+        aggregatorThread.start();
+      }
 
-    // Start the hourly host aggregator
-    TimelineMetricAggregator hourlyHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
-    if (!hourlyHostAggregator.isDisabled()) {
-      Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
-      aggregatorHourlyThread.start();
-    }
+      // Start the daily cluster aggregator
+      TimelineMetricAggregator dailyClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
+      if (!dailyClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(dailyClusterAggregator);
+        aggregatorThread.start();
+      }
+
+      // Start the minute host aggregator
+      TimelineMetricAggregator minuteHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+      if (!minuteHostAggregator.isDisabled()) {
+        Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
+        minuteAggregatorThread.start();
+      }
+
+      // Start the hourly host aggregator
+      TimelineMetricAggregator hourlyHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+      if (!hourlyHostAggregator.isDisabled()) {
+        Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
+        aggregatorHourlyThread.start();
+      }
+
+      // Start the daily host aggregator
+      TimelineMetricAggregator dailyHostAggregator =
+        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
+      if (!dailyHostAggregator.isDisabled()) {
+        Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
+        aggregatorDailyThread.start();
+      }
 
-    // Start the daily host aggregator
-    TimelineMetricAggregator dailyHostAggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
-    if (!dailyHostAggregator.isDisabled()) {
-      Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
-      aggregatorDailyThread.start();
+      isInitialized = true;
     }
+
   }
 
   @Override

+ 13 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java

@@ -162,6 +162,12 @@ public class TimelineMetricConfiguration {
   public static final String OUT_OFF_BAND_DATA_TIME_ALLOWANCE =
     "timeline.metrics.service.outofband.time.allowance.millis";
 
+  public static final String USE_GROUPBY_AGGREGATOR_QUERIES =
+    "timeline.metrics.service.use.groupBy.aggregators";
+
+  public static final String HANDLER_THREAD_COUNT =
+    "timeline.metrics.service.handler.thread.count";
+
   public static final String HOST_APP_ID = "HOST";
 
   private Configuration hbaseConf;
@@ -217,6 +223,13 @@ public class TimelineMetricConfiguration {
     return defaultHttpAddress;
   }
 
+  public int getTimelineMetricsServiceHandlerThreadCount() {
+    if (metricsConf != null) {
+      return Integer.parseInt(metricsConf.get(HANDLER_THREAD_COUNT, "20"));
+    }
+    return 20;
+  }
+
   public String getTimelineServiceRpcAddress() {
     String defaultRpcAddress = "0.0.0.0:60200";
     if (metricsConf != null) {

+ 9 - 7
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java

@@ -122,8 +122,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
         + " seconds.");
 
       long startTime = clock.getTime();
-      boolean success = doWork(lastCheckPointTime,
-        lastCheckPointTime + SLEEP_INTERVAL);
+      boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
       long executionTime = clock.getTime() - startTime;
       long delta = SLEEP_INTERVAL - executionTime;
 
@@ -242,16 +241,19 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
       stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
 
       LOG.debug("Query issued @: " + new Date());
-      rs = stmt.executeQuery();
+      if (condition.doUpdate()) {
+        int rows = stmt.executeUpdate();
+        conn.commit();
+        LOG.info(rows + " row(s) updated.");
+      } else {
+        rs = stmt.executeQuery();
+      }
       LOG.debug("Query returned @: " + new Date());
 
       aggregate(rs, startTime, endTime);
       LOG.info("End aggregation cycle @ " + new Date());
 
-    } catch (SQLException e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } catch (IOException e) {
+    } catch (SQLException | IOException e) {
       LOG.error("Exception during aggregating metrics.", e);
       success = false;
     } finally {

+ 71 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java

@@ -43,6 +43,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
@@ -53,7 +54,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 
 /**
  * Factory class that knows how to create a aggregator instance using
- * @TimelineMetricConfiguration
+ * TimelineMetricConfiguration
  */
 public class TimelineMetricAggregatorFactory {
   private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE =
@@ -69,6 +70,10 @@ public class TimelineMetricAggregatorFactory {
   private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
     "timeline-metrics-cluster-aggregator-daily-checkpoint";
 
+  private static boolean useGroupByAggregator(Configuration metricsConf) {
+    return Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"));
+  }
+
   /**
    * Minute based aggregation for hosts.
    */
@@ -89,6 +94,19 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_RECORD_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
     return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -119,6 +137,19 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        3600000l
+      );
+    }
+
     return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -149,6 +180,19 @@ public class TimelineMetricAggregatorFactory {
     String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
     String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        hostAggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        3600000l
+      );
+    }
+
     return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -220,6 +264,19 @@ public class TimelineMetricAggregatorFactory {
     String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
     String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
     return new TimelineMetricClusterAggregator(
       hBaseAccessor, metricsConf,
       checkpointLocation,
@@ -254,6 +311,19 @@ public class TimelineMetricAggregatorFactory {
     String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
     String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED;
 
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
     return new TimelineMetricClusterAggregator(
       hBaseAccessor, metricsConf,
       checkpointLocation,

+ 100 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java

@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class);
+  private final String aggregateColumnName;
+
+  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                         Configuration metricsConf,
+                                         String checkpointLocation,
+                                         Long sleepIntervalMillis,
+                                         Integer checkpointCutOffMultiplier,
+                                         String hostAggregatorDisabledParam,
+                                         String inputTableName,
+                                         String outputTableName,
+                                         Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier,
+      hostAggregatorDisabledParam, inputTableName, outputTableName,
+      nativeTimeRangeDelay);
+
+    if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
+      aggregateColumnName = "HOSTS_COUNT";
+    } else {
+      aggregateColumnName = "METRIC_COUNT";
+    }
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    EmptyCondition condition = new EmptyCondition();
+    condition.setDoUpdate(true);
+
+    /*
+    UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID,
+    SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN)
+    SELECT METRIC_NAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS,
+    SUM(METRIC_SUM), SUM(HOSTS_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN)
+    FROM METRIC_AGGREGATE WHERE SERVER_TIME >= 1441155600000 AND
+    SERVER_TIME < 1441159200000 GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS;
+     */
+
+    condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+      outputTableName, aggregateColumnName, tableName,
+      startTime, endTime));
+
+    return condition;
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+    LOG.info("Aggregated cluster metrics for " + outputTableName +
+      ", with startTime = " + new Date(startTime) +
+      ", endTime = " + new Date(endTime));
+  }
+}

+ 73 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java

@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
+
+public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
+
+  public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor,
+                                      Configuration metricsConf,
+                                      String checkpointLocation,
+                                      Long sleepIntervalMillis,
+                                      Integer checkpointCutOffMultiplier,
+                                      String hostAggregatorDisabledParam,
+                                      String tableName,
+                                      String outputTableName,
+                                      Long nativeTimeRangeDelay) {
+    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
+      checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName,
+      outputTableName, nativeTimeRangeDelay);
+  }
+
+  @Override
+  protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
+
+    LOG.info("Aggregated host metrics for " + outputTableName +
+      ", with startTime = " + new Date(startTime) +
+      ", endTime = " + new Date(endTime));
+  }
+
+  @Override
+  protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
+    EmptyCondition condition = new EmptyCondition();
+    condition.setDoUpdate(true);
+
+    condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
+      outputTableName, tableName, startTime, endTime));
+
+    return condition;
+  }
+
+}

+ 1 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java

@@ -43,4 +43,5 @@ public interface Condition {
   void setFetchSize(Integer fetchSize);
   void addOrderByColumn(String column);
   void setNoLimit();
+  boolean doUpdate();
 }

+ 5 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java

@@ -199,6 +199,11 @@ public class DefaultCondition implements Condition {
     this.noLimit = true;
   }
 
+  @Override
+  public boolean doUpdate() {
+    return false;
+  }
+
   public Integer getLimit() {
     if (noLimit) {
       return null;

+ 139 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java

@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision;
+
+import java.util.List;
+
+/**
+ * Encapsulate a Condition with pre-formatted and pre-parsed query string.
+ */
+public class EmptyCondition implements Condition {
+  String statement;
+  boolean doUpdate = false;
+
+  @Override
+  public boolean isEmpty() {
+    return false;
+  }
+
+  @Override
+  public List<String> getMetricNames() {
+    return null;
+  }
+
+  @Override
+  public boolean isPointInTime() {
+    return false;
+  }
+
+  @Override
+  public boolean isGrouped() {
+    return true;
+  }
+
+  @Override
+  public void setStatement(String statement) {
+    this.statement = statement;
+  }
+
+  @Override
+  public List<String> getHostnames() {
+    return null;
+  }
+
+  @Override
+  public Precision getPrecision() {
+    return null;
+  }
+
+  @Override
+  public void setPrecision(Precision precision) {
+
+  }
+
+  @Override
+  public String getAppId() {
+    return null;
+  }
+
+  @Override
+  public String getInstanceId() {
+    return null;
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+    return null;
+  }
+
+  @Override
+  public String getOrderByClause(boolean asc) {
+    return null;
+  }
+
+  @Override
+  public String getStatement() {
+    return statement;
+  }
+
+  @Override
+  public Long getStartTime() {
+    return null;
+  }
+
+  @Override
+  public Long getEndTime() {
+    return null;
+  }
+
+  @Override
+  public Integer getLimit() {
+    return null;
+  }
+
+  @Override
+  public Integer getFetchSize() {
+    return null;
+  }
+
+  @Override
+  public void setFetchSize(Integer fetchSize) {
+
+  }
+
+  @Override
+  public void addOrderByColumn(String column) {
+
+  }
+
+  @Override
+  public void setNoLimit() {
+
+  }
+
+  public void setDoUpdate(boolean doUpdate) {
+    this.doUpdate = doUpdate;
+  }
+
+  @Override
+  public boolean doUpdate() {
+    return doUpdate;
+  }
+}

+ 74 - 47
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java

@@ -56,50 +56,50 @@ public class PhoenixTransactSQL {
 
   public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
-      "(METRIC_NAME VARCHAR, " +
-      "HOSTNAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE," +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE," +
-      "METRIC_MIN DOUBLE CONSTRAINT pk " +
-      "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
-      " COMPRESSION='%s'";
+    "(METRIC_NAME VARCHAR, " +
+    "HOSTNAME VARCHAR, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE," +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE," +
+    "METRIC_MIN DOUBLE CONSTRAINT pk " +
+    "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+    " COMPRESSION='%s'";
 
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
-      "(METRIC_NAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE, " +
-      "HOSTS_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
+    "(METRIC_NAME VARCHAR, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "HOSTS_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE " +
+    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
 
   // HOSTS_COUNT vs METRIC_COUNT
   public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
-      "(METRIC_NAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
-      "UNITS CHAR(20), " +
-      "METRIC_SUM DOUBLE, " +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
-      "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
+    "(METRIC_NAME VARCHAR, " +
+    "APP_ID VARCHAR, " +
+    "INSTANCE_ID VARCHAR, " +
+    "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+    "UNITS CHAR(20), " +
+    "METRIC_SUM DOUBLE, " +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE " +
+    "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " +
+    "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
 
   /**
    * ALTER table to set new options
@@ -234,6 +234,29 @@ public class PhoenixTransactSQL {
     "METRIC_MIN " +
     "FROM %s";
 
+  /**
+   * Aggregate host metrics using a GROUP BY clause to take advantage of
+   * N - way parallel scan where N = number of regions.
+   */
+  public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT %s " +
+    "INTO %s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
+    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
+    "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS, " +
+    "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
+    "FROM %s WHERE SERVER_TIME >= %s AND SERVER_TIME < %s " +
+    "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS";
+
+  /**
+   * Aggregate app metrics using a GROUP BY clause to take advantage of
+   * N - way parallel scan where N = number of regions.
+   */
+  public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT %s " +
+    "INTO %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
+    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT METRIC_NAME, APP_ID, " +
+    "INSTANCE_ID, MAX(SERVER_TIME), UNITS, SUM(METRIC_SUM), SUM(%s), " +
+    "MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE SERVER_TIME >= %s AND " +
+    "SERVER_TIME < %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";
+
   public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
   public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
     "METRIC_RECORD_MINUTE";
@@ -263,8 +286,8 @@ public class PhoenixTransactSQL {
     return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
   }
 
-  public static PreparedStatement prepareGetMetricsSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
+  public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection,
+      Condition condition) throws SQLException {
 
     validateConditionIsNotEmpty(condition);
     validateRowCountLimit(condition);
@@ -323,15 +346,18 @@ public class PhoenixTransactSQL {
     }
 
     StringBuilder sb = new StringBuilder(stmtStr);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    String orderByClause = condition.getOrderByClause(true);
 
-    if (orderByClause != null) {
-      sb.append(orderByClause);
-    } else {
-      sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+    if (!(condition instanceof EmptyCondition)) {
+      sb.append(" WHERE ");
+      sb.append(condition.getConditionClause());
+      String orderByClause = condition.getOrderByClause(true);
+      if (orderByClause != null) {
+        sb.append(orderByClause);
+      } else {
+        sb.append(" ORDER BY METRIC_NAME, SERVER_TIME ");
+      }
     }
+
     if (condition.getLimit() != null) {
       sb.append(" LIMIT ").append(condition.getLimit());
     }
@@ -339,6 +365,7 @@ public class PhoenixTransactSQL {
     if (LOG.isDebugEnabled()) {
       LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
     }
+
     PreparedStatement stmt = null;
     try {
       stmt = connection.prepareStatement(sb.toString());
@@ -404,7 +431,7 @@ public class PhoenixTransactSQL {
 
   private static void validateRowCountLimit(Condition condition) {
     if (condition.getMetricNames() == null
-      || condition.getMetricNames().isEmpty() ) {
+        || condition.getMetricNames().isEmpty() ) {
       //aggregator can use empty metrics query
       return;
     }

+ 5 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java

@@ -168,6 +168,11 @@ public class SplitByMetricNamesCondition implements Condition {
     adaptee.setNoLimit();
   }
 
+  @Override
+  public boolean doUpdate() {
+    return false;
+  }
+
   public List<String> getOriginalMetricNames() {
     return adaptee.getMetricNames();
   }

+ 77 - 8
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -64,6 +66,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
   @Before
   public void setUp() throws Exception {
+    Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG);
     hdb = createTestableHBaseAccessor();
     // inits connection, starts mini cluster
     conn = getConnection(getUrl());
@@ -87,11 +90,17 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     conn.close();
   }
 
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) {
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
   @Test
   public void testShouldAggregateClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -143,7 +152,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterIgnoringInstance() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -218,7 +227,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -282,7 +291,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testAggregateDailyClusterMetrics() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false));
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -327,7 +336,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
 
     // this time can be virtualized! or made independent from real clock
     long startTime = System.currentTimeMillis();
@@ -371,7 +380,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false));
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime;
@@ -431,7 +440,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
 
   @Test
   public void testAppLevelHostMetricAggregates() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = getConfigurationForTest(false);
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
       TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, conf);
@@ -485,7 +494,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   @Test
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data
@@ -558,6 +567,66 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     Assert.assertEquals(9, recordCount);
   }
 
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true));
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
+      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+    hdb.saveClusterAggregateRecords(records);
+
+    // WHEN
+    agg.doWork(startTime, ctime + minute);
+
+    // THEN
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+    int count = 0;
+    while (rs.next()) {
+      if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
+        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+      }
+      count++;
+    }
+    assertEquals("Two hourly aggregated row expected ", 2, count);
+  }
+
   private ResultSet executeQuery(String query) throws SQLException {
     Connection conn = getConnection(getUrl());
     Statement stmt = conn.createStatement();

+ 77 - 7
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,6 +60,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
 
   @Before
   public void setUp() throws Exception {
+    Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG);
     hdb = createTestableHBaseAccessor();
     // inits connection, starts mini cluster
     conn = getConnection(getUrl());
@@ -105,11 +108,18 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
       .containsExactlyElementsOf(recordRead.getMetrics());
   }
 
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) {
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
   @Test
   public void testShouldAggregateMinuteProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator aggregatorMinute =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -132,8 +142,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_AGGREGATE_MINUTE_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
     MetricHostAggregate expectedAggregate =
       MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
@@ -170,7 +179,8 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
    public void testShouldAggregateHourProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
+        getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -209,8 +219,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
       METRICS_AGGREGATE_HOURLY_TABLE_NAME));
 
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
     ResultSet rs = pstmt.executeQuery();
 
     while (rs.next()) {
@@ -233,7 +242,8 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
   public void testMetricAggregateDaily() throws Exception {
     // GIVEN
     TimelineMetricAggregator aggregator =
-      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
+        getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
     long startTime = System.currentTimeMillis();
 
@@ -290,6 +300,66 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
     }
   }
 
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregatorMinute =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(true));
+    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime, endTime);
+    assertTrue(success);
+
+    Condition condition = new DefaultCondition(null, null, null, null, startTime,
+      endTime, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        count++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
   private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
     new Comparator<TimelineMetric>() {
       @Override

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java

@@ -418,7 +418,7 @@ public class Configuration {
   private static final String TIMELINE_METRICS_CACHE_TTL = "server.timeline.metrics.cache.entry.ttl.seconds";
   private static final String DEFAULT_TIMELINE_METRICS_CACHE_TTL = "3600";
   private static final String TIMELINE_METRICS_CACHE_IDLE_TIME = "server.timeline.metrics.cache.entry.idle.seconds";
-  private static final String DEFAULT_TIMELINE_METRICS_CACHE_IDLE_TIME = "300";
+  private static final String DEFAULT_TIMELINE_METRICS_CACHE_IDLE_TIME = "1800";
   private static final String TIMELINE_METRICS_REQUEST_READ_TIMEOUT = "server.timeline.metrics.cache.read.timeout.millis";
   private static final String DEFAULT_TIMELINE_METRICS_REQUEST_READ_TIMEOUT = "10000";
   private static final String TIMELINE_METRICS_REQUEST_INTERVAL_READ_TIMEOUT = "server.timeline.metrics.cache.interval.read.timeout.millis";

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java

@@ -4209,7 +4209,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
       widgetEntity.setProperties(gson.toJson(layoutInfo.getProperties()));
       widgetEntity.setWidgetValues(gson.toJson(layoutInfo.getValues()));
       widgetEntity.setListWidgetLayoutUserWidgetEntity(new LinkedList<WidgetLayoutUserWidgetEntity>());
-      LOG.debug("Creating cluster widget with: name = " +
+      LOG.info("Creating cluster widget with: name = " +
         layoutInfo.getWidgetName() + ", type = " + layoutInfo.getType() + ", " +
         "cluster = " + clusterEntity.getClusterName());
       // Persisting not visible widgets

+ 20 - 1
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-hbase-site.xml

@@ -109,7 +109,7 @@
   </property>
   <property>
     <name>phoenix.query.spoolThresholdBytes</name>
-    <value>12582912</value>
+    <value>20971520</value>
     <description>
       Threshold size in bytes after which results from parallelly executed
       query results are spooled to disk. Default is 20 mb.
@@ -293,4 +293,23 @@
       different mount point from the one for hbase.rootdir in embedded mode.
     </description>
   </property>
+  <property>
+    <name>phoenix.mutate.batchSize</name>
+    <value>10000</value>
+    <description>
+      The number of rows that are batched together and automatically committed
+      during the execution of an UPSERT SELECT or DELETE statement.
+      This affects performance of group by aggregators if they are being used.
+    </description>
+  </property>
+  <property>
+    <name>phoenix.query.rowKeyOrderSaltedTable</name>
+    <value>true</value>
+    <description>
+      When set, we disallow user specified split points on salted table to ensure
+      that each bucket will only contains entries with the same salt byte.
+      When this property is turned on, the salted table would behave just like
+      a normal table and would return items in rowkey order for scans
+    </description>
+  </property>
 </configuration>

+ 8 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml

@@ -318,4 +318,12 @@
       an application. Example: bytes_read across Yarn Nodemanagers.
     </description>
   </property>
+  <property>
+    <name>timeline.metrics.service.use.groupBy.aggregators</name>
+    <value>true</value>
+    <description>
+      Use a groupBy aggregated query to perform host level aggregations vs
+      in-memory aggregations.
+    </description>
+  </property>
 </configuration>