Browse Source

AMBARI-13701. Introduce cluster wide MINUTE aggregator in Ambari Metrics service. (Aravindan Vijayan via swagle)

Siddharth Wagle 9 years ago
parent
commit
4c3be3975a
13 changed files with 356 additions and 67 deletions
  1. 9 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
  2. 20 13
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
  3. 13 1
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
  4. 70 9
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
  5. 3 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
  6. 7 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
  7. 79 13
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
  8. 3 4
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
  9. 6 13
      ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
  10. 22 0
      ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
  11. 47 7
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
  12. 1 1
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py
  13. 76 0
      ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java

+ 9 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -76,7 +76,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
         LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
       }
 
-      // Start the cluster aggregator minute
+      // Start the cluster aggregator second
+      TimelineMetricAggregator secondClusterAggregator =
+        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf);
+      if (!secondClusterAggregator.isDisabled()) {
+        Thread aggregatorThread = new Thread(secondClusterAggregator);
+        aggregatorThread.start();
+      }
+
+      // Start the minute cluster aggregator
       TimelineMetricAggregator minuteClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
       if (!minuteClusterAggregator.isDisabled()) {

+ 20 - 13
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java

@@ -51,8 +51,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -63,6 +61,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
@@ -76,7 +75,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
@@ -86,6 +85,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
@@ -242,13 +242,14 @@ public class PhoenixHBaseAccessor {
 
     String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
     String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
-    String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");
-    String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");
-    String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");
-    String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");
-    String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000");
-    String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");
-    String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000");
+    String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400");           //1 day
+    String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800");          //7 days
+    String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000");          //30 days
+    String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000");       //1 year
+    String clusterSecTtl = metricsConf.get(CLUSTER_SECOND_TABLE_TTL, "2592000");     //7 days
+    String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "7776000");   //30 days
+    String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000");   //1 year
+    String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "63072000"); //2 years
 
     try {
       LOG.info("Initializing metrics schema...");
@@ -278,9 +279,11 @@ public class PhoenixHBaseAccessor {
         aggregateSql += getSplitPointsStr(splitPoints);
       }
       stmt.executeUpdate(aggregateSql);
-      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
+        METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, clusterHourTtl, compression));
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
         METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression));
-      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL,
+      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
         METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression));
 
       //alter TTL options to update tables
@@ -298,6 +301,9 @@ public class PhoenixHBaseAccessor {
         hostDailyTtl));
       stmt.executeUpdate(String.format(ALTER_SQL,
         METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
+        clusterSecTtl));
+      stmt.executeUpdate(String.format(ALTER_SQL,
+        METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
         clusterMinTtl));
       stmt.executeUpdate(String.format(ALTER_SQL,
         METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
@@ -664,7 +670,8 @@ public class PhoenixHBaseAccessor {
     for (Function aggregateFunction : functions) {
       SingleValuedTimelineMetric metric;
 
-      if (condition.getPrecision() == Precision.HOURS
+      if (condition.getPrecision() == Precision.MINUTES
+          || condition.getPrecision() == Precision.HOURS
           || condition.getPrecision() == Precision.DAYS) {
         metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false);
       } else {

+ 13 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java

@@ -64,6 +64,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_HOUR_TABLE_TTL =
     "timeline.metrics.host.aggregator.hourly.ttl";
 
+  public static final String CLUSTER_SECOND_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.second.ttl";
+
   public static final String CLUSTER_MINUTE_TABLE_TTL =
     "timeline.metrics.cluster.aggregator.minute.ttl";
 
@@ -74,7 +77,7 @@ public class TimelineMetricConfiguration {
     "timeline.metrics.cluster.aggregator.daily.ttl";
 
   public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
-    "timeline.metrics.cluster.aggregator.minute.timeslice.interval";
+    "timeline.metrics.cluster.aggregator.second.timeslice.interval";
 
   public static final String AGGREGATOR_CHECKPOINT_DELAY =
     "timeline.metrics.service.checkpointDelay";
@@ -91,6 +94,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL =
     "timeline.metrics.host.aggregator.daily.interval";
 
+  public static final String CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.second.interval";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
     "timeline.metrics.cluster.aggregator.minute.interval";
 
@@ -109,6 +115,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier";
 
+  public static final String CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
     "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
 
@@ -136,6 +145,9 @@ public class TimelineMetricConfiguration {
   public static final String HOST_AGGREGATOR_DAILY_DISABLED =
     "timeline.metrics.host.aggregator.hourly.disabled";
 
+  public static final String CLUSTER_AGGREGATOR_SECOND_DISABLED =
+    "timeline.metrics.cluster.aggregator.second.disabled";
+
   public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
     "timeline.metrics.cluster.aggregator.minute.disabled";
 

+ 70 - 9
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java

@@ -28,8 +28,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
@@ -49,6 +52,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
 
@@ -65,6 +69,8 @@ public class TimelineMetricAggregatorFactory {
     "timeline-metrics-host-aggregator-daily-checkpoint";
   private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
     "timeline-metrics-cluster-aggregator-checkpoint";
+  private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE =
+    "timeline-metrics-cluster-aggregator-minute-checkpoint";
   private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
     "timeline-metrics-cluster-aggregator-hourly-checkpoint";
   private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
@@ -76,6 +82,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Minute based aggregation for hosts.
+   * Interval : 5 mins
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -119,6 +126,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Hourly aggregation for hosts.
+   * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -162,6 +170,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Daily aggregation for hosts.
+   * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -204,10 +213,12 @@ public class TimelineMetricAggregatorFactory {
   }
 
   /**
-   * Minute based aggregation for cluster.
+   * Second aggregation for cluster.
+   * Interval : 2 mins
+   * Timeslice : 30 sec
    */
-  public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
-      PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+  public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -216,20 +227,20 @@ public class TimelineMetricAggregatorFactory {
       CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
 
     long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l));
+      (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l));
 
     long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
-      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15));
+      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
 
     int checkpointCutOffMultiplier =
-      metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+      metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
 
     String inputTableName = METRICS_RECORD_TABLE_NAME;
     String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
 
-    // Minute based aggregation have added responsibility of time slicing
-    return new TimelineMetricClusterAggregatorMinute(
+    // Second based aggregation have added responsibility of time slicing
+    return new TimelineMetricClusterAggregatorSecond(
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -242,8 +253,57 @@ public class TimelineMetricAggregatorFactory {
     );
   }
 
+  /**
+   * Minute aggregation for cluster.
+   * Interval : 5 mins
+   */
+  public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
+    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+    String checkpointDir = metricsConf.get(
+      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+
+    String checkpointLocation = FilenameUtils.concat(checkpointDir,
+      CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE);
+
+    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));
+
+    int checkpointCutOffMultiplier = metricsConf.getInt
+      (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+
+    String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+    String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
+
+    if (useGroupByAggregator(metricsConf)) {
+      return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        hBaseAccessor, metricsConf,
+        checkpointLocation,
+        sleepIntervalMillis,
+        checkpointCutOffMultiplier,
+        aggregatorDisabledParam,
+        inputTableName,
+        outputTableName,
+        120000l
+      );
+    }
+
+    return new TimelineMetricClusterAggregator(
+      hBaseAccessor, metricsConf,
+      checkpointLocation,
+      sleepIntervalMillis,
+      checkpointCutOffMultiplier,
+      aggregatorDisabledParam,
+      inputTableName,
+      outputTableName,
+      120000l
+    );
+  }
+
   /**
    * Hourly aggregation for cluster.
+   * Interval : 1 hour
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
@@ -291,6 +351,7 @@ public class TimelineMetricAggregatorFactory {
 
   /**
    * Daily aggregation for cluster.
+   * Interval : 1 day
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {

+ 3 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java

@@ -42,8 +42,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
  * Aggregates a metric across all hosts in the cluster. Reads metrics from
  * the precision table and saves into the aggregate.
  */
-public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggregator {
-  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorMinute.class);
+public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator {
+  private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorSecond.class);
   public Long timeSliceIntervalMillis;
   private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true);
   // Aggregator to perform app-level aggregates for host metrics
@@ -51,7 +51,7 @@ public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggre
   // 1 minute client side buffering adjustment
   private final Long serverTimeShiftAdjustment;
 
-  public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
+  public TimelineMetricClusterAggregatorSecond(PhoenixHBaseAccessor hBaseAccessor,
                                                Configuration metricsConf,
                                                String checkpointLocation,
                                                Long sleepIntervalMillis,

+ 7 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java

@@ -86,7 +86,7 @@ public class PhoenixTransactSQL {
       "TTL=%s, COMPRESSION='%s'";
 
   // HOSTS_COUNT vs METRIC_COUNT
-  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL =
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL =
     "CREATE TABLE IF NOT EXISTS %s " +
       "(METRIC_NAME VARCHAR, " +
       "APP_ID VARCHAR, " +
@@ -248,6 +248,8 @@ public class PhoenixTransactSQL {
     "METRIC_RECORD_DAILY";
   public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
     "METRIC_AGGREGATE";
+  public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_AGGREGATE_MINUTE";
   public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
     "METRIC_AGGREGATE_HOURLY";
   public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
@@ -555,7 +557,10 @@ public class PhoenixTransactSQL {
         metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
         queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
         break;
-      //TODO : Include MINUTE case after introducing CLUSTER_AGGREGATOR_MINUTE
+      case MINUTES:
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+        break;
       default:
         metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
         queryStmt = GET_CLUSTER_AGGREGATE_SQL;

+ 79 - 13
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java

@@ -42,15 +42,14 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
 import static junit.framework.Assert.fail;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
@@ -101,7 +100,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -111,7 +110,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       "disk_free", 1));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
       "disk_free", 2));
-    ctime += minute;
+    ctime += 2*minute;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
       "disk_free", 2));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
@@ -153,7 +152,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   public void testShouldAggregateClusterIgnoringInstance() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -214,21 +213,21 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       if ("disk_free".equals(currentMetric.getMetricName())) {
         System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate);
         assertEquals(2, currentHostAggregate.getNumberOfHosts());
-        assertEquals(5.0, currentHostAggregate.getSum());
+        assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
         recordCount++;
       } else {
         fail("Unexpected entry");
       }
     }
 
-    Assert.assertEquals(8, recordCount);
+    Assert.assertEquals(5, recordCount);
   }
 
   @Test
   public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // here we put some metrics tha will be aggregated
@@ -242,7 +241,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
       "disk_used", 1));
 
-    ctime += minute;
+    ctime += 2*minute;
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
       "disk_free", 2));
     hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
@@ -333,6 +332,73 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     assertEquals("Day aggregated row expected ", 1, count);
   }
 
+  @Test
+  public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
+
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long second = 1000;
+    long minute = 60*second;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(startTime, ctime + second);
+    long oldCtime = ctime + second;
+
+    //Next minute
+    ctime = startTime + minute;
+
+    records.put(createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(createEmptyTimelineClusterMetric(ctime += second),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    agg.doWork(oldCtime, ctime + second);
+
+    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE");
+    int count = 0;
+    long diff = 0 ;
+    while (rs.next()) {
+      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+      if (count == 0) {
+        diff+=rs.getLong("SERVER_TIME");
+      } else {
+        diff-=rs.getLong("SERVER_TIME");
+        if (diff < 0) {
+          diff*=-1;
+        }
+        assertTrue(diff == minute);
+      }
+      count++;
+    }
+
+    assertEquals("One hourly aggregated row expected ", 2, count);
+  }
+
   @Test
   public void testShouldAggregateClusterOnHourProperly() throws Exception {
     // GIVEN
@@ -444,7 +510,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
     Configuration conf = getConfigurationForTest(false);
     conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, conf);
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, conf);
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     long startTime = System.currentTimeMillis();
@@ -483,7 +549,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
       currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs);
       recordCount++;
     }
-    assertEquals(4, recordCount);
+    assertEquals(3, recordCount);
     assertNotNull(currentMetric);
     assertEquals("cpu_user", currentMetric.getMetricName());
     assertEquals("app1", currentMetric.getAppId());
@@ -495,7 +561,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
   @Test
   public void testClusterAggregateMetricNormalization() throws Exception {
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false));
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false));
     TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
     // Sample data
@@ -565,7 +631,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
         fail("Unexpected entry");
       }
     }
-    Assert.assertEquals(9, recordCount);
+    Assert.assertEquals(5, recordCount);
   }
 
   @Test

+ 3 - 4
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java

@@ -48,7 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
 
 
 public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
@@ -205,7 +204,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetClusterMetricRecordsSeconds() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;
@@ -236,7 +235,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
     TimelineMetric metric = timelineMetrics.getMetrics().get(0);
 
     assertEquals("disk_free", metric.getMetricName());
-    assertEquals(8, metric.getMetricValues().size());
+    assertEquals(5, metric.getMetricValues().size());
     assertEquals(1.5, metric.getMetricValues().values().iterator().next(), 0.00001);
   }
 
@@ -244,7 +243,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
   public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
     // GIVEN
     TimelineMetricAggregator agg =
-      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration());
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration());
 
     long startTime = System.currentTimeMillis();
     long ctime = startTime + 1;

+ 6 - 13
ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java

@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.TreeMap;
 
 @Singleton
@@ -250,24 +251,16 @@ public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactor
       Long requestedStartTime, Long requestedEndTime, boolean removeAll) {
 
     for (TimelineMetric existingMetric : existingMetrics.getMetrics()) {
-      if(removeAll) {
+      if (removeAll) {
         existingMetric.setMetricValues(new TreeMap<Long, Double>());
       } else {
-        Map<Long, Double> existingMetricValues = existingMetric.getMetricValues();
+        TreeMap<Long, Double> existingMetricValues = existingMetric.getMetricValues();
         LOG.trace("Existing metric: " + existingMetric.getMetricName() +
           " # " + existingMetricValues.size());
 
-        Iterator<Map.Entry<Long, Double>> valueIterator = existingMetricValues.entrySet().iterator();
-
-        // Remove old values
-        // Assumption: All return value are millis
-        while (valueIterator.hasNext()) {
-          Map.Entry<Long, Double> metricEntry = valueIterator.next();
-          if (metricEntry.getKey() < requestedStartTime
-            || metricEntry.getKey() > requestedEndTime) {
-            valueIterator.remove();
-          }
-        }
+        // Retain only the values that are within the [requestStartTime, requestedEndTime] window
+        existingMetricValues.headMap(requestedStartTime,false).clear();
+        existingMetricValues.tailMap(requestedEndTime, false).clear();
       }
     }
   }

+ 22 - 0
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java

@@ -85,6 +85,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
   private static final String KAFKA_BROKER = "kafka-broker";
   private static final String AMS_ENV = "ams-env";
   private static final String AMS_HBASE_ENV = "ams-hbase-env";
+  private static final String AMS_SITE = "ams-site";
   private static final String HBASE_ENV_CONFIG = "hbase-env";
   private static final String HIVE_SITE_CONFIG = "hive-site";
   private static final String RANGER_ENV_CONFIG = "ranger-env";
@@ -834,6 +835,27 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
             newProperties.put("content", updateAmsHbaseEnvContent(content));
             updateConfigurationPropertiesForCluster(cluster, AMS_HBASE_ENV, newProperties, true, true);
           }
+          Config amsSite = cluster.getDesiredConfigByType(AMS_SITE);
+          if (amsSite != null) {
+            Map<String, String> newProperties = new HashMap<>();
+
+            //Interval
+            newProperties.put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120));
+            newProperties.put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300));
+            newProperties.put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300));
+
+            //ttl
+            newProperties.put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000));
+            newProperties.put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000));
+
+            //checkpoint
+            newProperties.put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2));
+
+            //disabled
+            newProperties.put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false));
+
+            updateConfigurationPropertiesForCluster(cluster, AMS_SITE, newProperties, true, true);
+          }
         }
       }
     }

+ 47 - 7
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml

@@ -59,11 +59,11 @@
   </property>
   <property>
     <name>timeline.metrics.host.aggregator.minute.interval</name>
-    <value>120</value>
+    <value>300</value>
     <display-name>Minute host aggregator interval</display-name>
     <description>
       Time in seconds to sleep for the minute resolution host based
-      aggregator. Default resolution is 2 minutes.
+      aggregator. Default resolution is 5 minutes.
     </description>
     <value-attributes>
       <type>int</type>
@@ -111,10 +111,22 @@
   </property>
   <property>
     <name>timeline.metrics.cluster.aggregator.minute.interval</name>
-    <value>120</value>
+    <value>300</value>
     <display-name>Minute cluster aggregator interval</display-name>
     <description>
       Time in seconds to sleep for the minute resolution cluster wide
+      aggregator. Default resolution is 5 minutes.
+    </description>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+  </property>
+  <property>
+    <name>timeline.metrics.cluster.aggregator.second.interval</name>
+    <value>120</value>
+    <display-name>Second cluster aggregator interval</display-name>
+    <description>
+      Time in seconds to sleep for the second resolution cluster wide
       aggregator. Default resolution is 2 minutes.
     </description>
     <value-attributes>
@@ -169,6 +181,19 @@
       <type>int</type>
     </value-attributes>
   </property>
+  <property>
+    <name>timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier</name>
+    <value>2</value>
+    <display-name>Second cluster aggregator checkpoint cutOff multiplier</display-name>
+    <description>
+      Multiplier value * interval = Max allowed checkpoint lag. Effectively
+      if aggregator checkpoint is greater than max allowed checkpoint delay,
+      the checkpoint will be discarded by the aggregator.
+    </description>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+  </property>
   <property>
     <name>timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier</name>
     <value>2</value>
@@ -238,11 +263,19 @@
     </description>
   </property>
   <property>
-    <name>timeline.metrics.cluster.aggregator.minute.timeslice.interval</name>
+    <name>timeline.metrics.cluster.aggregator.second.disabled</name>
+    <value>false</value>
+    <display-name>Disable second cluster aggregator</display-name>
+    <description>
+      Disable cluster based second aggregations.
+    </description>
+  </property>
+  <property>
+    <name>timeline.metrics.cluster.aggregator.second.timeslice.interval</name>
     <value>30</value>
-    <display-name>Minute cluster aggregator timeslice interval</display-name>
+    <display-name>Second cluster aggregator timeslice interval</display-name>
     <description>
-      Lowest resolution of desired data for cluster level minute aggregates.
+      Lowest resolution of desired data for cluster level second aggregates.
     </description>
     <value-attributes>
       <type>int</type>
@@ -270,8 +303,15 @@
     </description>
   </property>
   <property>
-    <name>timeline.metrics.cluster.aggregator.minute.ttl</name>
+    <name>timeline.metrics.cluster.aggregator.second.ttl</name>
     <value>2592000</value>
+    <description>
+      Cluster wide second resolution data purge interval. Default is 7 days.
+    </description>
+  </property>
+  <property>
+    <name>timeline.metrics.cluster.aggregator.minute.ttl</name>
+    <value>7776000</value>
     <description>
       Cluster wide minute resolution data purge interval. Default is 30 days.
     </description>

+ 1 - 1
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/split_points.py

@@ -27,7 +27,7 @@ import ast
 
 metric_filename_ext = '.txt'
 # 5 regions for higher order aggregate tables
-other_region_static_count = 5
+other_region_static_count = 6
 # Max equidistant points to return per service
 max_equidistant_points = 50
 

+ 76 - 0
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java

@@ -18,6 +18,8 @@
 
 package org.apache.ambari.server.upgrade;
 
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
@@ -26,9 +28,15 @@ import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.persist.PersistService;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariManagementControllerImpl;
+import org.apache.ambari.server.controller.ConfigurationRequest;
+import org.apache.ambari.server.controller.ConfigurationResponse;
+import org.apache.ambari.server.controller.KerberosHelper;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.orm.DBAccessor;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -76,6 +84,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
@@ -89,6 +98,8 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * {@link org.apache.ambari.server.upgrade.UpgradeCatalog213} unit tests.
@@ -592,6 +603,71 @@ public class UpgradeCatalog213Test {
     easyMockSupport.verifyAll();
   }
 
+  @Test
+  public void testAmsSiteUpdateConfigs() throws Exception{
+
+    Map<String, String> oldPropertiesAmsSite = new HashMap<String, String>() {
+      {
+        //Including only those properties that might be present in an older version.
+        put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(1000));
+        put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(1000));
+        put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(1000));
+      }
+    };
+    Map<String, String> newPropertiesAmsSite = new HashMap<String, String>() {
+      {
+        put("timeline.metrics.cluster.aggregator.second.interval",String.valueOf(120));
+        put("timeline.metrics.cluster.aggregator.minute.interval",String.valueOf(300));
+        put("timeline.metrics.host.aggregator.minute.interval",String.valueOf(300));
+        put("timeline.metrics.cluster.aggregator.second.ttl", String.valueOf(2592000));
+        put("timeline.metrics.cluster.aggregator.minute.ttl", String.valueOf(7776000));
+        put("timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier", String.valueOf(2));
+        put("timeline.metrics.cluster.aggregator.second.disabled", String.valueOf(false));
+      }
+    };
+    EasyMockSupport easyMockSupport = new EasyMockSupport();
+
+    Clusters clusters = easyMockSupport.createNiceMock(Clusters.class);
+    final Cluster cluster = easyMockSupport.createNiceMock(Cluster.class);
+    Config mockAmsSite = easyMockSupport.createNiceMock(Config.class);
+
+    expect(clusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{
+      put("normal", cluster);
+    }}).once();
+    expect(cluster.getDesiredConfigByType("ams-site")).andReturn(mockAmsSite).atLeastOnce();
+    expect(mockAmsSite.getProperties()).andReturn(oldPropertiesAmsSite).times(1);
+
+    Injector injector = easyMockSupport.createNiceMock(Injector.class);
+    expect(injector.getInstance(Gson.class)).andReturn(null).anyTimes();
+    expect(injector.getInstance(MaintenanceStateHelper.class)).andReturn(null).anyTimes();
+    expect(injector.getInstance(KerberosHelper.class)).andReturn(createNiceMock(KerberosHelper.class)).anyTimes();
+
+    replay(injector, clusters, mockAmsSite, cluster);
+
+    AmbariManagementControllerImpl controller = createMockBuilder(AmbariManagementControllerImpl.class)
+      .addMockedMethod("createConfiguration")
+      .addMockedMethod("getClusters", new Class[] { })
+      .withConstructor(createNiceMock(ActionManager.class), clusters, injector)
+      .createNiceMock();
+
+    Injector injector2 = easyMockSupport.createNiceMock(Injector.class);
+    Capture<ConfigurationRequest> configurationRequestCapture = EasyMock.newCapture();
+    ConfigurationResponse configurationResponseMock = easyMockSupport.createMock(ConfigurationResponse.class);
+
+    expect(injector2.getInstance(AmbariManagementController.class)).andReturn(controller).anyTimes();
+    expect(controller.getClusters()).andReturn(clusters).anyTimes();
+    expect(controller.createConfiguration(capture(configurationRequestCapture))).andReturn(configurationResponseMock).once();
+
+    replay(controller, injector2, configurationResponseMock);
+    new UpgradeCatalog213(injector2).updateAMSConfigs();
+    easyMockSupport.verifyAll();
+
+    ConfigurationRequest configurationRequest = configurationRequestCapture.getValue();
+    Map<String, String> updatedProperties = configurationRequest.getProperties();
+    assertTrue(Maps.difference(newPropertiesAmsSite, updatedProperties).areEqual());
+
+  }
+
   @Test
   public void testUpdateAlertDefinitions() {
     EasyMockSupport easyMockSupport = new EasyMockSupport();