Просмотр исходного кода

AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)

Dmytro Sen 8 лет назад
Родитель
Сommit
041d353b0d
96 измененных файлов с 1594 добавлено и 47 удалено
  1. 11 1
      ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
  2. 20 0
      ambari-metrics/ambari-metrics-assembly/pom.xml
  3. 7 0
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
  4. 8 1
      ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml
  5. 22 2
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
  6. 60 0
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
  7. 1 1
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
  8. 1 1
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
  9. 2 2
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
  10. 3 3
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
  11. 65 0
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
  12. 10 0
      ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
  13. 10 0
      ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
  14. 10 0
      ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
  15. 16 0
      ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
  16. 17 3
      ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
  17. 31 0
      ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties
  18. 29 0
      ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties
  19. 120 0
      ambari-metrics/ambari-metrics-host-aggregator/pom.xml
  20. 134 0
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
  21. 101 0
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java
  22. 180 0
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
  23. 56 0
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
  24. 60 0
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java
  25. 98 0
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
  26. 1 1
      ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
  27. 110 0
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
  28. 33 2
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
  29. 28 0
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
  30. 7 1
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
  31. 2 1
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
  32. 5 1
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
  33. 17 0
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
  34. 14 0
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
  35. 14 0
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
  36. 16 0
      ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
  37. 16 0
      ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
  38. 25 4
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
  39. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
  40. 2 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
  41. 2 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
  42. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
  43. 1 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
  44. 2 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
  45. 1 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
  46. 1 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
  47. 2 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
  48. 31 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
  49. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
  50. 1 1
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
  51. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
  52. 4 4
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
  53. 6 0
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
  54. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
  55. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
  56. 7 6
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
  57. 1 0
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
  58. 1 0
      ambari-metrics/pom.xml
  59. 10 0
      ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java
  60. 2 0
      ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py
  61. 3 0
      ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2
  62. 8 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml
  63. 11 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml
  64. 3 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml
  65. 30 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py
  66. 5 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
  67. 3 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2
  68. 7 0
      ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2
  69. 3 0
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
  70. 2 0
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
  71. 3 0
      ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py
  72. 2 0
      ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
  73. 2 0
      ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2
  74. 2 0
      ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml
  75. 2 0
      ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py
  76. 2 0
      ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
  77. 2 0
      ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
  78. 2 0
      ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2
  79. 2 0
      ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
  80. 3 0
      ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py
  81. 2 0
      ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2
  82. 2 0
      ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2
  83. 2 0
      ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2
  84. 2 0
      ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2
  85. 11 0
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml
  86. 3 0
      ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py
  87. 2 0
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
  88. 2 0
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2
  89. 2 0
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
  90. 3 0
      ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
  91. 2 0
      ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
  92. 2 0
      ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml
  93. 2 0
      ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py
  94. 2 0
      ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2
  95. 10 0
      ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java
  96. 2 0
      contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py

+ 11 - 1
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java

@@ -88,6 +88,16 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
     return null;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return false;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return 0;
+  }
+
   @Override
   protected boolean emitMetrics(TimelineMetrics metrics) {
     return super.emitMetrics(metrics);
@@ -103,4 +113,4 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
     return collectorPort;
   }
 
-}
+}

+ 20 - 0
ambari-metrics/ambari-metrics-assembly/pom.xml

@@ -35,6 +35,7 @@
   <properties>
     <collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir>
     <monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir>
+    <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir>
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
@@ -598,6 +599,19 @@
                         </source>
                       </sources>
                     </mapping>
+                    <mapping>
+                      <directory>/var/lib/ambari-metrics-monitor/lib</directory>
+                      <sources>
+                        <source>
+                          <location>
+                            ${aggregator.dir}/target/
+                          </location>
+                          <includes>
+                            <include>ambari-metrics-host-aggregator-${project.version}.jar</include>
+                          </includes>
+                        </source>
+                      </sources>
+                    </mapping>
                     <mapping>
                       <directory>/etc/ambari-metrics-monitor/conf</directory>
                       <configuration>true</configuration>
@@ -744,6 +758,7 @@
                     <path>/var/run/ambari-metrics-grafana</path>
                     <path>/var/log/ambari-metrics-grafana</path>
                     <path>/var/lib/ambari-metrics-collector</path>
+                    <path>/var/lib/ambari-metrics-monitor/lib</path>
                     <path>/var/lib/ambari-metrics-grafana</path>
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
                     <path>/usr/lib/ambari-metrics-kafka-sink</path>
@@ -1331,6 +1346,11 @@
       <type>pom</type>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-host-aggregator</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 

+ 7 - 0
ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml

@@ -63,6 +63,13 @@
         <include>metric_monitor.ini</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <directory>${aggregator.dir}/conf/windows</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
     <fileSet>
       <directory>${monitor.dir}/conf/windows</directory>
       <outputDirectory>/</outputDirectory>

+ 8 - 1
ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml

@@ -45,6 +45,13 @@
         <include>metric_monitor.ini</include>
       </includes>
     </fileSet>
+    <fileSet>
+      <directory>${aggregator.dir}/conf/unix</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
     <fileSet>
       <directory>${monitor.dir}/conf/unix</directory>
       <outputDirectory>bin</outputDirectory>
@@ -68,4 +75,4 @@
 
 
 
-</assembly>
+</assembly>

+ 22 - 2
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -78,6 +78,8 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
   public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+  public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
+  public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
   public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
   public static final String INSTANCE_ID_PROPERTY = "instanceId";
   public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
@@ -241,8 +243,14 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   protected boolean emitMetrics(TimelineMetrics metrics) {
-    String collectorHost = getCurrentCollectorHost();
-    String connectUrl = getCollectorUri(collectorHost);
+    String connectUrl;
+    if (isHostInMemoryAggregationEnabled()) {
+      connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+    } else {
+      String collectorHost  = getCurrentCollectorHost();
+      connectUrl = getCollectorUri(collectorHost);
+    }
+
     String jsonData = null;
     LOG.debug("EmitMetrics connectUrl = "  + connectUrl);
     try {
@@ -562,4 +570,16 @@ public abstract class AbstractTimelineMetricsSink {
    * @return String "host1"
    */
   abstract protected String getHostname();
+
+  /**
+   * Check if host in-memory aggregation is enabled
+   * @return
+   */
+  abstract protected boolean isHostInMemoryAggregationEnabled();
+
+  /**
+   * In memory aggregation port
+   * @return
+   */
+  abstract protected int getHostInMemoryAggregationPort();
 }

+ 60 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name="AggregationResult")
+public class AggregationResult {
+    protected Set<TimelineMetricWithAggregatedValues> result;
+    protected Long timeInMilis;
+
+    @Override
+    public String toString() {
+        return "AggregationResult{" +
+                "result=" + result +
+                ", timeInMilis=" + timeInMilis +
+                '}';
+    }
+
+    public AggregationResult() {
+    }
+
+    public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) {
+        this.result = result;
+        this.timeInMilis = timeInMilis;
+    }
+    @XmlElement
+    public Set<TimelineMetricWithAggregatedValues> getResult() {
+        return result;
+    }
+
+    public void setResult(Set<TimelineMetricWithAggregatedValues> result) {
+        this.result = result;
+    }
+    @XmlElement
+    public Long getTimeInMilis() {
+        return timeInMilis;
+    }
+
+    public void setTimeInMilis(Long timeInMilis) {
+        this.timeInMilis = timeInMilis;
+    }
+}

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java → ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java → ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import org.codehaus.jackson.annotate.JsonCreator;

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java → ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import org.codehaus.jackson.annotate.JsonCreator;
@@ -54,7 +54,7 @@ public class MetricHostAggregate extends MetricAggregate {
     this.numberOfSamples = numberOfSamples;
   }
 
-  public double getAvg() {
+  public double calculateAverage() {
     return sum / numberOfSamples;
   }
 

+ 3 - 3
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java

@@ -45,7 +45,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
   private String type;
   private String units;
   private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-  private Map<String, String> metadata = new HashMap<>();
+  private HashMap<String, String> metadata = new HashMap<>();
 
   // default
   public TimelineMetric() {
@@ -151,11 +151,11 @@ public class TimelineMetric implements Comparable<TimelineMetric> {
   }
 
   @XmlElement(name = "metadata")
-  public Map<String,String> getMetadata () {
+  public HashMap<String,String> getMetadata () {
     return metadata;
   }
 
-  public void setMetadata (Map<String,String> metadata) {
+  public void setMetadata (HashMap<String,String> metadata) {
     this.metadata = metadata;
   }
 

+ 65 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "TimelineMetricWithAggregatedValues")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetricWithAggregatedValues {
+    private TimelineMetric timelineMetric;
+    private MetricHostAggregate metricAggregate;
+
+    public TimelineMetricWithAggregatedValues() {
+    }
+
+    public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) {
+        timelineMetric = metric;
+        this.metricAggregate = metricAggregate;
+    }
+
+    @XmlElement
+    public MetricHostAggregate getMetricAggregate() {
+        return metricAggregate;
+    }
+    @XmlElement
+    public TimelineMetric getTimelineMetric() {
+        return timelineMetric;
+    }
+
+    public void setTimelineMetric(TimelineMetric timelineMetric) {
+        this.timelineMetric = timelineMetric;
+    }
+
+    public void setMetricAggregate(MetricHostAggregate metricAggregate) {
+        this.metricAggregate = metricAggregate;
+    }
+
+    @Override
+    public String toString() {
+        return "TimelineMetricWithAggregatedValues{" +
+                "timelineMetric=" + timelineMetric +
+                ", metricAggregate=" + metricAggregate +
+                '}';
+    }
+}

+ 10 - 0
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java

@@ -89,6 +89,16 @@ public class AbstractTimelineMetricSinkTest {
       return "h1";
     }
 
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
     @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();

+ 10 - 0
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java

@@ -192,5 +192,15 @@ public class MetricCollectorHATest {
     protected String getHostname() {
       return "h1";
     }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
   }
 }

+ 10 - 0
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java

@@ -124,6 +124,16 @@ public class HandleConnectExceptionTest {
       return "h1";
     }
 
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
     @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();

+ 16 - 0
ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java

@@ -63,6 +63,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private int timeoutSeconds = 10;
   private boolean setInstanceId;
   private String instanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
+
 
   @Override
   public void start() {
@@ -110,6 +113,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
+
+    hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
     // Initialize the collector write strategy
     super.init();
 
@@ -162,6 +168,16 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }

+ 17 - 3
ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java

@@ -75,6 +75,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
       return t;
     }
   });
+  private int hostInMemoryAggregationPort;
+  private boolean hostInMemoryAggregationEnabled;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -107,7 +109,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
     collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
     port = conf.getString(COLLECTOR_PORT, "6188");
-
+    hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
+    hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
     if (collectorHosts.isEmpty()) {
       LOG.error("No Metric collector configured.");
     } else {
@@ -248,6 +251,16 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     return hostName;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   @Override
   public void putMetrics(MetricsRecord record) {
     try {
@@ -308,9 +321,10 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
 
       int sbBaseLen = sb.length();
       List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-      Map<String, String> metadata = null;
+      HashMap<String, String> metadata = null;
       if (skipAggregation) {
-        metadata = Collections.singletonMap("skipAggregation", "true");
+        metadata = new HashMap<>();
+        metadata.put("skipAggregation", "true");
       }
       long startTime = record.timestamp();
 

+ 31 - 0
ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties

@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+

+ 29 - 0
ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties

@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n

+ 120 - 0
ambari-metrics/ambari-metrics-host-aggregator/pom.xml

@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.0.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ambari-metrics-host-aggregator</artifactId>
+    <packaging>jar</packaging>
+
+    <name>ambari-metrics-host-aggregator</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>14.0.1</version>
+        </dependency>
+        <dependency>
+              <groupId>org.apache.ambari</groupId>
+              <artifactId>ambari-metrics-common</artifactId>
+              <version>2.0.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.2.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.7.1.2.3.4.0-3347</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>1.6</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 134 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java

@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisherThread extends Thread {
+    protected int publishIntervalInSeconds;
+    protected String publishURL;
+    protected ObjectMapper objectMapper;
+    private Log LOG;
+    protected TimelineMetricsHolder timelineMetricsHolder;
+
+    public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
+        LOG = LogFactory.getLog(this.getClass());
+        this.publishURL = publishURL;
+        this.publishIntervalInSeconds = publishIntervalInSeconds;
+        this.timelineMetricsHolder = timelineMetricsHolder;
+        objectMapper = new ObjectMapper();
+        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+        objectMapper.setAnnotationIntrospector(introspector);
+        objectMapper.getSerializationConfig()
+                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    }
+
+    /**
+     * Publishes metrics to collector in specified intervals while not interrupted.
+     */
+    @Override
+    public void run() {
+        while (!isInterrupted()) {
+            try {
+                sleep(this.publishIntervalInSeconds * 1000);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            try {
+                processAndPublishMetrics(getMetricsFromCache());
+            } catch (Exception e) {
+                LOG.error("Couldn't process and send metrics : ",e);
+            }
+        }
+    }
+
+    /**
+     * Processes and sends metrics to collector.
+     * @param metricsFromCache
+     * @throws Exception
+     */
+    protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
+        if (metricsFromCache.size()==0) return;
+
+        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+        publishMetricsJson(processMetrics(metricsFromCache));
+    }
+
+    /**
+     * Returns metrics map. Source is based on implementation.
+     * @return
+     */
+    protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
+
+    /**
+     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+     * @param metricValues
+     * @return
+     */
+    protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
+
+    protected void publishMetricsJson(String jsonData) throws Exception {
+        int timeout = 5 * 1000;
+        HttpURLConnection connection = null;
+        if (this.publishURL == null) {
+            throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+        }
+        LOG.info("Collector URL : " + publishURL);
+        connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
+
+        connection.setRequestMethod("POST");
+        connection.setRequestProperty("Content-Type", "application/json");
+        connection.setRequestProperty("Connection", "Keep-Alive");
+        connection.setConnectTimeout(timeout);
+        connection.setReadTimeout(timeout);
+        connection.setDoOutput(true);
+
+        if (jsonData != null) {
+            try (OutputStream os = connection.getOutputStream()) {
+                os.write(jsonData.getBytes("UTF-8"));
+            }
+        }
+        int responseCode = connection.getResponseCode();
+        if (responseCode != 200) {
+            throw new Exception("responseCode is " + responseCode);
+        }
+        LOG.info("Successfully sent metrics.");
+    }
+
+    /**
+     * Interrupts the thread.
+     */
+    protected void stopPublisher() {
+        this.interrupt();
+    }
+}

+ 101 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
+
+    private Log LOG;
+
+    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+        super(timelineMetricsHolder, collectorURL, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+    /**
+     * get metrics map form @TimelineMetricsHolder
+     * @return
+     */
+    @Override
+    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+    }
+
+    /**
+     * Aggregates given metrics and converts them into json string that will be send to collector
+     * @param metricForAggregationValues
+     * @return
+     */
+    @Override
+    protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
+        HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+        for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+            for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+                if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+                    nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+                }
+                nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+            }
+        }
+        Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+        for (TimelineMetrics metrics : nameToMetricMap.values()) {
+            double sum = 0;
+            double max = Integer.MIN_VALUE;
+            double min = Integer.MAX_VALUE;
+            int count = 0;
+            for (TimelineMetric metric : metrics.getMetrics()) {
+                for (Double value : metric.getMetricValues().values()) {
+                    sum+=value;
+                    max = Math.max(max, value);
+                    min = Math.min(min, value);
+                    count++;
+                }
+            }
+            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+            tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+            metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+        }
+        String json = null;
+        try {
+            json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+
+        return json;
+    }
+}

+ 180 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java

@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.sun.jersey.api.container.httpserver.HttpServerFactory;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.net.httpserver.HttpServer;
+
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
+ */
+public class AggregatorApplication
+{
+    private static final int STOP_SECONDS_DELAY = 0;
+    private static final int JOIN_SECONDS_TIMEOUT = 2;
+    private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+    private static String AGGREGATED_POST_PREFIX = "/aggregated";
+    private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+    private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+    private final int webApplicationPort;
+    private final int rawPublishingInterval;
+    private final int aggregationInterval;
+    private Configuration configuration;
+    private String [] collectorHosts;
+    private AggregatedMetricsPublisher aggregatePublisher;
+    private RawMetricsPublisher rawPublisher;
+    private TimelineMetricsHolder timelineMetricsHolder;
+    private HttpServer httpServer;
+
+    public AggregatorApplication(String collectorHosts) {
+        initConfiguration();
+        this.collectorHosts = collectorHosts.split(",");
+        this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
+        this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
+        this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+        this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
+        try {
+            this.httpServer = createHttpServer();
+        } catch (IOException e) {
+            LOG.error("Exception while starting HTTP server. Exiting", e);
+            System.exit(1);
+        }
+    }
+
+    private void initConfiguration() {
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        if (classLoader == null) {
+            classLoader = getClass().getClassLoader();
+        }
+
+        URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+        LOG.info("Found metric service configuration: " + amsResUrl);
+        if (amsResUrl == null) {
+            throw new IllegalStateException("Unable to initialize the metrics " +
+                    "subsystem. No ams-site present in the classpath.");
+        }
+        configuration = new Configuration(true);
+        try {
+            configuration.addResource(amsResUrl.toURI().toURL());
+        } catch (Exception e) {
+            LOG.error("Couldn't init configuration. ", e);
+            System.exit(1);
+        }
+    }
+
+    private String getHostName() {
+        String hostName = "localhost";
+        try {
+            hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            LOG.error(e);
+        }
+        return hostName;
+    }
+
+    private URI getURI() {
+        URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+        LOG.info(String.format("Web server at %s", uri));
+        return uri;
+    }
+
+    private HttpServer createHttpServer() throws IOException {
+        ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
+        HashMap<String, Object> params = new HashMap();
+        params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
+        resourceConfig.setPropertiesAndFeatures(params);
+        return HttpServerFactory.create(getURI(), resourceConfig);
+    }
+
+    private void startWebServer() {
+        LOG.info("Starting web server.");
+        this.httpServer.start();
+    }
+
+    private void startAggregatePublisherThread() {
+        LOG.info("Starting aggregated metrics publisher.");
+        String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
+        aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
+        aggregatePublisher.start();
+    }
+
+    private void startRawPublisherThread() {
+        LOG.info("Starting raw metrics publisher.");
+        String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
+        rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
+        rawPublisher.start();
+    }
+
+
+
+    private void stop() {
+        aggregatePublisher.stopPublisher();
+        rawPublisher.stopPublisher();
+        httpServer.stop(STOP_SECONDS_DELAY);
+        LOG.info("Stopped web server.");
+        try {
+            LOG.info("Waiting for threads to join.");
+            aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            LOG.info("Gracefully stopped Aggregator Application.");
+        } catch (InterruptedException e) {
+            LOG.error("Received exception during stop : ", e);
+
+        }
+
+    }
+
+    private String buildBasicCollectorURL(String host) {
+        String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
+        String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+        return String.format(BASE_POST_URL, protocol, host, port);
+    }
+
+    public static void main( String[] args ) throws Exception {
+        LOG.info("Starting aggregator application");
+        if (args.length != 1) {
+            throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+        }
+
+        final AggregatorApplication app = new AggregatorApplication(args[0]);
+        app.startAggregatePublisherThread();
+        app.startRawPublisherThread();
+        app.startWebServer();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                LOG.info("Stopping aggregator application");
+                app.stop();
+            }
+        });
+    }
+}

+ 56 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+@Singleton
+@Path("/ws/v1/timeline")
+public class AggregatorWebService {
+    TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
+
+    @GET
+    @Produces("text/json")
+    @Path("/metrics")
+    public Response helloWorld() throws IOException {
+        return Response.ok().build();
+    }
+
+    @POST
+    @Produces(MediaType.TEXT_PLAIN)
+    @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+    @Path("/metrics")
+    public Response postMetrics(
+            TimelineMetrics metrics) {
+        metricsHolder.putMetricsForAggregationPublishing(metrics);
+        metricsHolder.putMetricsForRawPublishing(metrics);
+        return Response.ok().build();
+    }
+}

+ 60 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisherThread {
+    private final Log LOG;
+
+    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
+        super(timelineMetricsHolder, collectorURL, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+
+    @Override
+    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForRawPublishing();
+    }
+
+    @Override
+    protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
+        //merge everything in one TimelineMetrics object
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        for (TimelineMetrics metrics : metricValues.values()) {
+            for (TimelineMetric timelineMetric : metrics.getMetrics())
+                timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        }
+        //map TimelineMetrics to json string
+        String json = null;
+        try {
+            json = objectMapper.writeValueAsString(timelineMetrics);
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+        return json;
+    }
+}

+ 98 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Singleton class with 2 guava caches for raw and aggregated metrics storing
+ */
+public class TimelineMetricsHolder {
+    private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
+    private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
+    private Cache<Long, TimelineMetrics> aggregationMetricsCache;
+    private Cache<Long, TimelineMetrics> rawMetricsCache;
+    private static TimelineMetricsHolder instance = null;
+    //to ensure no metric values are expired
+    private static int EXPIRE_DELAY = 30;
+    ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
+    ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
+
+    private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+        this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+    }
+
+    public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        if (instance == null) {
+            instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
+        }
+        return instance;
+    }
+
+    /**
+     * Uses default expiration time for caches initialization if they are not initialized yet.
+     * @return
+     */
+    public static TimelineMetricsHolder getInstance() {
+        return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
+    }
+
+    public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
+        aggregationCacheLock.writeLock().lock();
+        aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+        aggregationCacheLock.writeLock().unlock();
+    }
+
+    public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+        return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
+    }
+
+    public void putMetricsForRawPublishing(TimelineMetrics metrics) {
+        rawCacheLock.writeLock().lock();
+        rawMetricsCache.put(System.currentTimeMillis(), metrics);
+        rawCacheLock.writeLock().unlock();
+    }
+
+    public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+        return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
+    }
+
+    /**
+     * Returns values from cache and clears the cache
+     * @param cache
+     * @param lock
+     * @return
+     */
+    private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+        lock.writeLock().lock();
+        Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+        cache.invalidateAll();
+        lock.writeLock().unlock();
+        return metricsMap;
+    }
+
+}

+ 1 - 1
ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor

@@ -24,7 +24,7 @@ METRIC_MONITOR_PY_SCRIPT=${RESOURCE_MONITORING_DIR}/main.py
 PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid
 OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out
 
-STOP_TIMEOUT=5
+STOP_TIMEOUT=10
 
 OK=0
 NOTOK=1

+ 110 - 0
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py

@@ -0,0 +1,110 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+import subprocess
+import logging
+import urllib2
+
+logger = logging.getLogger()
+class Aggregator(threading.Thread):
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self._aggregator_process = None
+    self._sleep_interval = config.get_collector_sleep_interval()
+    self.stopped = False
+
+  def run(self):
+    java_home = self._config.get_java_home()
+    collector_hosts = self._config.get_metrics_collector_hosts_as_string()
+    jvm_agrs = self._config.get_aggregator_jvm_agrs()
+    config_dir = self._config.get_config_dir()
+    class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
+    ams_log_file = "ambari-metrics-aggregator.log"
+    additional_classpath = ':{0}'.format(config_dir)
+    ams_log_dir = self._config.ams_monitor_log_dir()
+    logger.info('Starting Aggregator thread.')
+    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
+      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+
+    logger.info("Executing : {0}".format(cmd))
+
+    self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self._sleep_interval):
+        break
+    pass
+    self.stop()
+
+  def stop(self):
+    self.stopped = True
+    if self._aggregator_process :
+      logger.info('Stopping Aggregator thread.')
+      self._aggregator_process.terminate()
+
+class AggregatorWatchdog(threading.Thread):
+  SLEEP_TIME = 30
+  CONNECTION_TIMEOUT = 5
+  AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+    self._is_ok = threading.Event()
+    self.set_is_ok(True)
+    self.stopped = False
+
+  def run(self):
+    logger.info('Starting Aggregator Watchdog thread.')
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self.SLEEP_TIME):
+        break
+      try:
+        conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
+        self.set_is_ok(True)
+      except (KeyboardInterrupt, SystemExit):
+        raise
+      except Exception, e:
+        self.set_is_ok(False)
+        continue
+      if conn.code != 200:
+        self.set_is_ok(False)
+        continue
+      conn.close()
+
+  def is_ok(self):
+    return self._is_ok.is_set()
+
+  def set_is_ok(self, value):
+    if value == False and self.is_ok() != value:
+      logger.warning("Watcher couldn't connect to aggregator.")
+      self._is_ok.clear()
+    else:
+      self._is_ok.set()
+
+
+  def stop(self):
+    logger.info('Stopping watcher thread.')
+    self.stopped = True
+
+

+ 33 - 2
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py

@@ -30,6 +30,8 @@ from ambari_commons.os_family_impl import OsFamilyImpl
 # Abstraction for OS-dependent configuration defaults
 #
 class ConfigDefaults(object):
+  def get_config_dir(self):
+    pass
   def get_config_file_path(self):
     pass
   def get_metric_file_path(self):
@@ -40,11 +42,14 @@ class ConfigDefaults(object):
 @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
 class ConfigDefaultsWindows(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "conf"
     self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
     self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
     self._METRIC_FILE_PATH = "conf\\ca.pem"
     pass
 
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -55,11 +60,13 @@ class ConfigDefaultsWindows(ConfigDefaults):
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ConfigDefaultsLinux(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/"
     self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
     self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
     self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
     pass
-
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -71,6 +78,7 @@ configDefaults = ConfigDefaults()
 
 config = ConfigParser.RawConfigParser()
 
+CONFIG_DIR = configDefaults.get_config_dir()
 CONFIG_FILE_PATH = configDefaults.get_config_file_path()
 METRIC_FILE_PATH = configDefaults.get_metric_file_path()
 CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
@@ -191,6 +199,8 @@ class Configuration:
         # No hostname script identified in the ambari agent conf
         pass
     pass
+  def get_config_dir(self):
+    return CONFIG_DIR
 
   def getConfig(self):
     return self.config
@@ -214,10 +224,14 @@ class Configuration:
   def get_hostname_config(self):
     return self.get("default", "hostname", None)
 
-  def get_metrics_collector_hosts(self):
+  def get_metrics_collector_hosts_as_list(self):
     hosts = self.get("default", "metrics_servers", "localhost")
     return hosts.split(",")
 
+  def get_metrics_collector_hosts_as_string(self):
+    hosts = self.get("default", "metrics_servers", "localhost")
+    return hosts
+
   def get_failover_strategy(self):
     return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY)
 
@@ -239,6 +253,23 @@ class Configuration:
   def is_server_https_enabled(self):
     return "true" == str(self.get("collector", "https_enabled")).lower()
 
+  def get_java_home(self):
+    return self.get("aggregation", "java_home")
+
+  def is_inmemory_aggregation_enabled(self):
+    return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+
+  def get_inmemory_aggregation_port(self):
+    return self.get("aggregation", "host_in_memory_aggregation_port")
+
+  def get_aggregator_jvm_agrs(self):
+    hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
+    return hosts
+
+  def ams_monitor_log_dir(self):
+    hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
+    return hosts
+
   def is_set_instanceid(self):
     return "true" == str(self.get("default", "set.instanceId", 'false')).lower()
 

+ 28 - 0
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py

@@ -27,6 +27,9 @@ from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent
 from metric_collector import MetricsCollector
 from emitter import Emitter
 from host_info import HostInfo
+from aggregator import Aggregator
+from aggregator import AggregatorWatchdog
+
 
 logger = logging.getLogger()
 
@@ -50,11 +53,15 @@ class Controller(threading.Thread):
     self.initialize_events_cache()
     self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
     self._t = None
+    self.aggregator = None
+    self.aggregator_watchdog = None
 
   def run(self):
     logger.info('Running Controller thread: %s' % threading.currentThread().getName())
 
     self.start_emitter()
+    if self.config.is_inmemory_aggregation_enabled():
+      self.start_aggregator_with_watchdog()
 
     # Wake every 5 seconds to push events to the queue
     while True:
@@ -62,6 +69,10 @@ class Controller(threading.Thread):
         logger.warn('Event Queue full!! Suspending further collections.')
       else:
         self.enqueque_events()
+      # restart aggregator if needed
+      if self.config.is_inmemory_aggregation_enabled() and not self.aggregator_watchdog.is_ok():
+        logger.warning("Aggregator is not available. Restarting aggregator.")
+        self.start_aggregator_with_watchdog()
       pass
       # Wait for the service stop event instead of sleeping blindly
       if 0 == self._stop_handler.wait(self.sleep_interval):
@@ -75,6 +86,12 @@ class Controller(threading.Thread):
     # The emitter thread should have stopped by now, just ensure it has shut
     # down properly
     self.emitter.join(5)
+
+    if self.config.is_inmemory_aggregation_enabled():
+      self.aggregator.stop()
+      self.aggregator_watchdog.stop()
+      self.aggregator.join(5)
+      self.aggregator_watchdog.join(5)
     pass
 
   # TODO: Optimize to not use Timer class and use the Queue instead
@@ -115,3 +132,14 @@ class Controller(threading.Thread):
 
   def start_emitter(self):
     self.emitter.start()
+
+  # Start aggregator and watcher threads
+  def start_aggregator_with_watchdog(self):
+    if self.aggregator:
+      self.aggregator.stop()
+    if self.aggregator_watchdog:
+      self.aggregator.stop()
+    self.aggregator = Aggregator(self.config, self._stop_handler)
+    self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler)
+    self.aggregator.start()
+    self.aggregator_watchdog.start()

+ 7 - 1
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py

@@ -44,10 +44,16 @@ class Emitter(threading.Thread):
     self._stop_handler = stop_handler
     self.application_metric_map = application_metric_map
     self.collector_port = config.get_server_port()
-    self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
+    self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
     self.is_server_https_enabled = config.is_server_https_enabled()
     self.set_instanceid = config.is_set_instanceid()
     self.instanceid = config.get_instanceid()
+    self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
+
+    if self.is_inmemory_aggregation_enabled:
+      self.collector_port = config.get_inmemory_aggregation_port()
+      self.all_metrics_collector_hosts = ['localhost']
+      self.is_server_https_enabled = False
 
     if self.is_server_https_enabled:
       self.ca_certs = config.get_ca_certs()

+ 2 - 1
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py

@@ -117,7 +117,8 @@ class StopHandlerLinux(StopHandler):
 
   def wait(self, timeout=None):
     # Stop process when stop event received
-    if self.stop_event.wait(timeout):
+    self.stop_event.wait(timeout)
+    if self.stop_event.isSet():
       logger.info("Stop event received")
       return 0
     # Timeout

+ 5 - 1
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py

@@ -21,7 +21,7 @@ limitations under the License.
 import logging
 import os
 import sys
-
+import signal
 from ambari_commons.os_utils import remove_file
 
 from core.controller import Controller
@@ -73,6 +73,10 @@ def server_process_main(stop_handler, scmStatus=None):
   if scmStatus is not None:
     scmStatus.reportStarted()
 
+  # For some reason this is needed to catch system signals like SIGTERM
+  # TODO fix if possible
+  signal.pause()
+
   #The controller thread finishes when the stop event is signaled
   controller.join()
 

+ 17 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -72,6 +72,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
   private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
   private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -96,6 +98,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private String[] includedMetricsPrefixes;
   // Local cache to avoid prefix matching everytime
   private Set<String> excludedMetrics = new HashSet<>();
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -132,6 +136,17 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
     return hostname;
   }
 
+
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
@@ -169,6 +184,8 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
         instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY);
         setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY);
 
+        hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+        hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
 
         if (metricCollectorProtocol.contains("https")) {

+ 14 - 0
ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -55,6 +55,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   public StormTimelineMetricsReporter() {
 
@@ -95,6 +97,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
@@ -130,6 +142,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       applicationId = cf.get(APP_ID).toString();
       setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
       instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
+      hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
+      hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
 
       collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
       if (protocol.contains("https")) {

+ 14 - 0
ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -61,6 +61,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private String applicationId;
   private boolean setInstanceId;
   private String instanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -97,6 +99,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
@@ -126,6 +138,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
 
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
     // Initialize the collector write strategy
     super.init();
 

+ 16 - 0
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -50,6 +50,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private String instanceId;
   private String applicationId;
   private int timeoutSeconds;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   public StormTimelineMetricsReporter() {
 
@@ -90,6 +92,16 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   @Override
   public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
@@ -119,6 +131,10 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
       setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
       instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+
+      hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+      hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
       if (protocol.contains("https")) {
         String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();

+ 16 - 0
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -70,6 +70,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private String applicationId;
   private String instanceId;
   private boolean setInstanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -106,6 +108,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
@@ -137,6 +149,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+
     // Initialize the collector write strategy
     super.init();
 

+ 25 - 4
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -24,10 +24,13 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.service.AbstractService;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
@@ -62,6 +66,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
@@ -152,10 +157,14 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       scheduleAggregatorThread(dailyClusterAggregator);
 
       // Start the minute host aggregator
-      TimelineMetricAggregator minuteHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(minuteHostAggregator);
+      if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+        LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+      } else {
+        TimelineMetricAggregator minuteHostAggregator =
+          TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+            hBaseAccessor, metricsConf, haController);
+        scheduleAggregatorThread(minuteHostAggregator);
+      }
 
       // Start the hourly host aggregator
       TimelineMetricAggregator hourlyHostAggregator =
@@ -389,6 +398,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
     return metricMetadataManager.getHostedAppsCache();
   }
 
+  @Override
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+    Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+    for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+      aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+    }
+    hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+    return new TimelinePutResponse();
+  }
+
   @Override
   public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
           throws SQLException, IOException {

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -40,8 +42,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;

+ 2 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java

@@ -296,6 +296,8 @@ public class TimelineMetricConfiguration {
 
   public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
 
+  public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private Configuration amsEnvConf;

+ 2 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -80,6 +81,7 @@ public interface TimelineMetricStore {
    */
   Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException;
 
+  TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException;
   /**
    * Returns all hosts that have written metrics with the apps on the host
    * @return { hostname : [ appIds ] }

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java

@@ -19,10 +19,10 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import java.util.Map;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 /**

+ 1 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java

@@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;

+ 2 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;

+ 1 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java

@@ -38,6 +38,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;

+ 1 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;

+ 2 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;

+ 31 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java

@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
@@ -285,6 +286,36 @@ public class TimelineWebServices {
     }
   }
 
+  /**
+   * Store the given metrics into the timeline store, and return errors that
+   * happened during storing.
+   */
+  @Path("/metrics/aggregated")
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postAggregatedMetrics(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res,
+    AggregationResult metrics) {
+
+    init(res);
+    if (metrics == null) {
+      return new TimelinePutResponse();
+    }
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing aggregated metrics: " +
+                TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
+      }
+
+      return timelineMetricStore.putHostAggregatedMetrics(metrics);
+    } catch (Exception e) {
+      LOG.error("Error saving metrics.", e);
+      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
   @Path("/containermetrics")
   @POST
   @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java

@@ -26,12 +26,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;

+ 1 - 1
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java

@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 import java.util.Arrays;

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java

@@ -22,11 +22,11 @@ import com.google.common.collect.Multimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;

+ 4 - 4
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java

@@ -18,7 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline;
 
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +34,7 @@ public class TestMetricHostAggregate {
     assertThat(aggregate.getSum()).isEqualTo(3.0);
     assertThat(aggregate.getMin()).isEqualTo(1.0);
     assertThat(aggregate.getMax()).isEqualTo(2.0);
-    assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+    assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2);
   }
 
   @Test
@@ -50,7 +50,7 @@ public class TestMetricHostAggregate {
     assertThat(aggregate.getSum()).isEqualTo(12.0);
     assertThat(aggregate.getMin()).isEqualTo(0.5);
     assertThat(aggregate.getMax()).isEqualTo(7.5);
-    assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+    assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
   }
 
   static MetricHostAggregate createAggregate (Double sum, Double min,
@@ -63,4 +63,4 @@ public class TestMetricHostAggregate {
     aggregate.setNumberOfSamples(samplesCount);
     return aggregate;
   }
-}
+}

+ 6 - 0
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -91,6 +92,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
     return null;
   }
 
+  @Override
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+    return null;
+  }
+
   @Override
   public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
     return Collections.emptyMap();

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java

@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 import java.util.Collections;

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java

@@ -20,13 +20,13 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;

+ 7 - 6
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
@@ -124,14 +125,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else if ("mem_free".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else {
         fail("Unexpected entry");
@@ -198,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
         assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
       }
     }
   }
@@ -260,7 +261,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
         assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
       }
     }
   }
@@ -309,14 +310,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else if ("mem_free".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else {
         fail("Unexpected entry");

+ 1 - 0
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java

@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;

+ 1 - 0
ambari-metrics/pom.xml

@@ -33,6 +33,7 @@
     <module>ambari-metrics-host-monitoring</module>
     <module>ambari-metrics-grafana</module>
     <module>ambari-metrics-assembly</module>
+    <module>ambari-metrics-host-aggregator</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

+ 10 - 0
ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java

@@ -300,6 +300,16 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
     return hostName;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return false;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return 0;
+  }
+
   private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) {
     final List<TimelineMetric> metricList = new ArrayList<>();
     for (SingleMetric metric : metrics) {

+ 2 - 0
ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/scripts/params.py

@@ -153,6 +153,8 @@ if has_metric_collector:
   pass
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 
 # if accumulo is selected accumulo_tserver_hosts should not be empty, but still default just in case
 if 'slave_hosts' in config['clusterHostInfo']:

+ 3 - 0
ambari-server/src/main/resources/common-services/ACCUMULO/1.6.1.2.2.0/package/templates/hadoop-metrics2-accumulo.properties.j2

@@ -16,6 +16,9 @@
 # Poll collectors every {{metrics_report_interval}} seconds
 *.period={{metrics_collection_period}}
 
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
 {% if has_metric_collector %}
 
 *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar

+ 8 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml

@@ -100,6 +100,14 @@
     </description>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>timeline.metrics.host.inmemory.aggregation.jvm.arguments</name>
+    <value>-Xmx256m -Xms128m -XX:PermSize=68m</value>
+    <description>
+      Local aggregator jvm extra arguments separated with spaces
+    </description>
+    <on-ambari-upgrade add="true"/>
+  </property>
   <property>
     <name>timeline.metrics.skip.network.interfaces.patterns</name>
     <value>None</value>

+ 11 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml

@@ -787,4 +787,15 @@
     <value>{{cluster_zookeeper_clientPort}}</value>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>timeline.metrics.host.inmemory.aggregation</name>
+    <value>false</value>
+    <description>if set to "true" host metrics will be aggregated in memory on each host</description>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>timeline.metrics.host.inmemory.aggregation.port</name>
+    <value>61888</value>
+    <on-ambari-upgrade add="true"/>
+  </property>
 </configuration>

+ 3 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/metainfo.xml

@@ -93,6 +93,9 @@
               <primary>true</primary>
             </log>
           </logs>
+          <configuration-dependencies>
+            <config-type>ams-site</config-type>
+          </configuration-dependencies>
         </component>
 
         <component>

+ 30 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/ams.py

@@ -163,6 +163,20 @@ def ams(name=None):
               create_parents = True
     )
 
+    if params.host_in_memory_aggregation and params.log4j_props is not None:
+      File(os.path.join(params.ams_monitor_conf_dir, "log4j.properties"),
+           owner=params.ams_user,
+           content=params.log4j_props
+           )
+
+    XmlConfig("ams-site.xml",
+              conf_dir=params.ams_monitor_conf_dir,
+              configurations=params.config['configurations']['ams-site'],
+              configuration_attributes=params.config['configuration_attributes']['ams-site'],
+              owner=params.ams_user,
+              group=params.user_group
+              )
+
     TemplateConfig(
       os.path.join(params.ams_monitor_conf_dir, "metric_monitor.ini"),
       owner=params.ams_user,
@@ -366,6 +380,22 @@ def ams(name=None, action=None):
               create_parents = True
     )
 
+    if params.host_in_memory_aggregation and params.log4j_props is not None:
+      File(format("{params.ams_monitor_conf_dir}/log4j.properties"),
+           mode=0644,
+           group=params.user_group,
+           owner=params.ams_user,
+           content=InlineTemplate(params.log4j_props)
+           )
+
+    XmlConfig("ams-site.xml",
+              conf_dir=params.ams_monitor_conf_dir,
+              configurations=params.config['configurations']['ams-site'],
+              configuration_attributes=params.config['configuration_attributes']['ams-site'],
+              owner=params.ams_user,
+              group=params.user_group
+              )
+
     Execute(format("{sudo} chown -R {ams_user}:{user_group} {ams_monitor_log_dir}")
             )
 

+ 5 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py

@@ -224,6 +224,11 @@ metrics_collector_heapsize = check_append_heap_property(str(metrics_collector_he
 master_heapsize = check_append_heap_property(str(master_heapsize), "m")
 regionserver_heapsize = check_append_heap_property(str(regionserver_heapsize), "m")
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+host_in_memory_aggregation_jvm_arguments = default("/configurations/ams-env/timeline.metrics.host.inmemory.aggregation.jvm.arguments",
+                                                   "-Xmx256m -Xms128m -XX:PermSize=68m")
+
 regionserver_xmn_max = default('/configurations/ams-hbase-env/hbase_regionserver_xmn_max', None)
 if regionserver_xmn_max:
   regionserver_xmn_max = int(trim_heap_property(str(regionserver_xmn_max), "m"))

+ 3 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/hadoop-metrics2-hbase.properties.j2

@@ -58,6 +58,9 @@ rpc.protocol={{metric_collector_protocol}}
 
 *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
 *.sink.timeline.slave.host.name={{hostname}}
+*.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+
 hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.sink.timeline.period={{metrics_collection_period}}
 hbase.sink.timeline.sendInterval={{metrics_report_interval}}000

+ 7 - 0
ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2

@@ -38,3 +38,10 @@ failover_strategy = {{failover_strategy}}
 failover_strategy_blacklisted_interval_seconds = {{failover_strategy_blacklisted_interval_seconds}}
 port = {{metric_collector_port}}
 https_enabled = {{metric_collector_https_enabled}}
+
+[aggregation]
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+java_home = {{java64_home}}
+jvm_arguments = {{host_in_memory_aggregation_jvm_arguments}}
+ams_monitor_log_dir = {{ams_monitor_log_dir}}

+ 3 - 0
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py

@@ -124,6 +124,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 # Cluster Zookeeper quorum
 zookeeper_quorum = None
 if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0:

+ 2 - 0
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2

@@ -23,6 +23,8 @@ port={{metric_collector_port}}
 collectionFrequency={{metrics_collection_period}}000
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 instanceId={{cluster_name}}
 set.instanceId={{set_instanceId}}

+ 3 - 0
ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/scripts/params_linux.py

@@ -184,6 +184,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 # if hbase is selected the hbase_rs_hosts, should not be empty, but still default just in case
 if 'slave_hosts' in config['clusterHostInfo']:
   rs_hosts = default('/clusterHostInfo/hbase_rs_hosts', '/clusterHostInfo/slave_hosts') #if hbase_rs_hosts not given it is assumed that region servers on same nodes as slaves

+ 2 - 0
ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2

@@ -78,6 +78,8 @@ hbase.sink.timeline.protocol={{metric_collector_protocol}}
 hbase.sink.timeline.port={{metric_collector_port}}
 hbase.sink.timeline.instanceId={{cluster_name}}
 hbase.sink.timeline.set.instanceId={{set_instanceId}}
+hbase.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+hbase.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 hbase.sink.timeline.truststore.path = {{metric_truststore_path}}

+ 2 - 0
ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-RS.j2

@@ -76,6 +76,8 @@ hbase.sink.timeline.protocol={{metric_collector_protocol}}
 hbase.sink.timeline.port={{metric_collector_port}}
 hbase.sink.timeline.instanceId={{cluster_name}}
 hbase.sink.timeline.set.instanceId={{set_instanceId}}
+hbase.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+hbase.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 hbase.sink.timeline.truststore.path = {{metric_truststore_path}}

+ 2 - 0
ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/configuration/hadoop-metrics2.properties.xml

@@ -88,6 +88,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/scripts/params_linux.py

@@ -566,6 +566,8 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 ########################################################
 ############# Atlas related params #####################
 ########################################################

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2

@@ -53,6 +53,8 @@
   hivemetastore.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hivemetastore.sink.timeline.port={{metric_collector_port}}
   hivemetastore.sink.timeline.protocol={{metric_collector_protocol}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 
 {% endif %}

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2

@@ -53,5 +53,7 @@
   hiveserver2.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hiveserver2.sink.timeline.port={{metric_collector_port}}
   hiveserver2.sink.timeline.protocol={{metric_collector_protocol}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llapdaemon.j2

@@ -52,5 +52,7 @@
   llapdaemon.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llapdaemon.sink.timeline.port={{metric_collector_port}}
   llapdaemon.sink.timeline.protocol={{metric_collector_protocol}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/0.12.0.2.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2

@@ -52,5 +52,7 @@
   llaptaskscheduler.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llaptaskscheduler.sink.timeline.port={{metric_collector_port}}
   llaptaskscheduler.sink.timeline.protocol={{metric_collector_protocol}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

+ 3 - 0
ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/scripts/params_linux.py

@@ -565,6 +565,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 ########################################################
 ############# Atlas related params #####################
 ########################################################

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hivemetastore.properties.j2

@@ -53,6 +53,8 @@
   hivemetastore.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hivemetastore.sink.timeline.port={{metric_collector_port}}
   hivemetastore.sink.timeline.protocol={{metric_collector_protocol}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hivemetastore.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 
 {% endif %}

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-hiveserver2.properties.j2

@@ -53,5 +53,7 @@
   hiveserver2.sink.timeline.collector.hosts={{ams_collector_hosts}}
   hiveserver2.sink.timeline.port={{metric_collector_port}}
   hiveserver2.sink.timeline.protocol={{metric_collector_protocol}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  hiveserver2.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llapdaemon.j2

@@ -52,5 +52,7 @@
   llapdaemon.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llapdaemon.sink.timeline.port={{metric_collector_port}}
   llapdaemon.sink.timeline.protocol={{metric_collector_protocol}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llapdaemon.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

+ 2 - 0
ambari-server/src/main/resources/common-services/HIVE/2.1.0.3.0/package/templates/hadoop-metrics2-llaptaskscheduler.j2

@@ -52,5 +52,7 @@
   llaptaskscheduler.sink.timeline.collector.hosts={{ams_collector_hosts}}
   llaptaskscheduler.sink.timeline.port={{metric_collector_port}}
   llaptaskscheduler.sink.timeline.protocol={{metric_collector_protocol}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  llaptaskscheduler.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 {% endif %}

+ 11 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.8.1/configuration/kafka-broker.xml

@@ -422,4 +422,15 @@
     <description>Timeline metrics reporter send interval</description>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>kafka.timeline.metrics.host_in_memory_aggregation</name>
+    <value>{{host_in_memory_aggregation}}</value>
+    <description>if set to "true" host metrics will be aggregated in memory on each host</description>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>kafka.timeline.metrics.host_in_memory_aggregation_port</name>
+    <value>{{host_in_memory_aggregation_port}}</value>
+    <on-ambari-upgrade add="true"/>
+  </property>
 </configuration>

+ 3 - 0
ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/params.py

@@ -156,6 +156,9 @@ if has_metric_collector:
     metric_collector_protocol = 'https'
   else:
     metric_collector_protocol = 'http'
+
+  host_in_memory_aggregation = str(default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)).lower()
+  host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
   pass
 
 # Security-related params

+ 2 - 0
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py

@@ -208,6 +208,8 @@ metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sin
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
 metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 
 
 # Cluster Zookeeper quorum

+ 2 - 0
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/config.yaml.j2

@@ -61,6 +61,8 @@ metrics_collector:
   protocol: "{{metric_collector_protocol}}"
   port: "{{metric_collector_port}}"
   appId: "{{metric_collector_app_id}}"
+  host_in_memory_aggregation = {{host_in_memory_aggregation}}
+  host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
   # HTTPS settings
   truststore.path : "{{metric_truststore_path}}"

+ 2 - 0
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2

@@ -25,6 +25,8 @@ sendInterval={{metrics_report_interval}}000
 clusterReporterAppId=nimbus
 instanceId={{cluster_name}}
 set.instanceId={{set_instanceId}}
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 truststore.path = {{metric_truststore_path}}

+ 3 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py

@@ -164,6 +164,9 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
+
 # Cluster Zookeeper quorum
 zookeeper_quorum = None
 if has_zk_host:

+ 2 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2

@@ -77,6 +77,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

+ 2 - 0
ambari-server/src/main/resources/stacks/HDP/2.6/services/HDFS/configuration/hadoop-metrics2.properties.xml

@@ -88,6 +88,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

+ 2 - 0
ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/scripts/params.py

@@ -158,6 +158,8 @@ if has_metric_collector:
   pass
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 
 # Cluster Zookeeper quorum
 zookeeper_quorum = None

+ 2 - 0
ambari-server/src/main/resources/stacks/HDP/3.0/hooks/before-START/templates/hadoop-metrics2.properties.j2

@@ -77,6 +77,8 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.port={{metric_collector_port}}
 *.sink.timeline.instanceId={{cluster_name}}
 *.sink.timeline.set.instanceId={{set_instanceId}}
+*.sink.timeline.host_in_memory_aggregation = {{host_in_memory_aggregation}}
+*.sink.timeline.host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}

+ 10 - 0
ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java

@@ -73,6 +73,16 @@ public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink imple
     return "localhost";
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return true;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return 61888;
+  }
+
   @Override
   public void init(MetricsConfiguration configuration) {
 

+ 2 - 0
contrib/management-packs/hdf-ambari-mpack/src/main/resources/stacks/HDF/2.0/hooks/before-START/scripts/params.py

@@ -135,6 +135,8 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+host_in_memory_aggregation = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation", True)
+host_in_memory_aggregation_port = default("/configurations/ams-site/timeline.metrics.host.inmemory.aggregation.port", 61888)
 #hadoop params
 
 if has_namenode or dfs_type == 'HCFS':