瀏覽代碼

AMBARI-21128 Add AMS HA support to local metrics aggregator application (dsen)

Dmytro Sen 8 年之前
父節點
當前提交
29f7508947
共有 17 個文件被更改,包括 994 次插入212 次删除
  1. 2 2
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
  2. 24 6
      ambari-metrics/ambari-metrics-host-aggregator/pom.xml
  3. 0 134
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java
  4. 62 36
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
  5. 1 1
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
  6. 18 8
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
  7. 169 0
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
  8. 15 13
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
  9. 14 9
      ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
  10. 55 0
      ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
  11. 135 0
      ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
  12. 107 0
      ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
  13. 82 0
      ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
  14. 154 0
      ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
  15. 151 0
      ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
  16. 4 2
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
  17. 1 1
      ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py

+ 2 - 2
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -88,7 +88,7 @@ public abstract class AbstractTimelineMetricsSink {
   private static final String WWW_AUTHENTICATE = "WWW-Authenticate";
   private static final String NEGOTIATE = "Negotiate";
 
-  protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
+  protected final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
   protected static final AtomicInteger nullCollectorCounter = new AtomicInteger(0);
   public static int NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS = 20;
@@ -120,7 +120,7 @@ public abstract class AbstractTimelineMetricsSink {
   private volatile boolean isInitializedForHA = false;
 
   @SuppressWarnings("all")
-  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5;
+  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 3;
 
   private final Gson gson = new Gson();
 

+ 24 - 6
ambari-metrics/ambari-metrics-host-aggregator/pom.xml

@@ -37,12 +37,6 @@
     </properties>
 
     <dependencies>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>3.8.1</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
@@ -83,6 +77,30 @@
             <artifactId>hadoop-common</artifactId>
             <version>2.7.1.2.3.4.0-3347</version>
         </dependency>
+        <dependency>
+            <groupId>com.sun.jersey.jersey-test-framework</groupId>
+            <artifactId>jersey-test-framework-core</artifactId>
+            <version>1.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey.jersey-test-framework</groupId>
+            <artifactId>jersey-test-framework-grizzly2</artifactId>
+            <version>1.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 0 - 134
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java

@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.host.aggregator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Map;
-
-/**
- * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
- */
-public abstract class AbstractMetricPublisherThread extends Thread {
-    protected int publishIntervalInSeconds;
-    protected String publishURL;
-    protected ObjectMapper objectMapper;
-    private Log LOG;
-    protected TimelineMetricsHolder timelineMetricsHolder;
-
-    public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) {
-        LOG = LogFactory.getLog(this.getClass());
-        this.publishURL = publishURL;
-        this.publishIntervalInSeconds = publishIntervalInSeconds;
-        this.timelineMetricsHolder = timelineMetricsHolder;
-        objectMapper = new ObjectMapper();
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
-        objectMapper.setAnnotationIntrospector(introspector);
-        objectMapper.getSerializationConfig()
-                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
-    }
-
-    /**
-     * Publishes metrics to collector in specified intervals while not interrupted.
-     */
-    @Override
-    public void run() {
-        while (!isInterrupted()) {
-            try {
-                sleep(this.publishIntervalInSeconds * 1000);
-            } catch (InterruptedException e) {
-                //Ignore
-            }
-            try {
-                processAndPublishMetrics(getMetricsFromCache());
-            } catch (Exception e) {
-                LOG.error("Couldn't process and send metrics : ",e);
-            }
-        }
-    }
-
-    /**
-     * Processes and sends metrics to collector.
-     * @param metricsFromCache
-     * @throws Exception
-     */
-    protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception {
-        if (metricsFromCache.size()==0) return;
-
-        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
-        publishMetricsJson(processMetrics(metricsFromCache));
-    }
-
-    /**
-     * Returns metrics map. Source is based on implementation.
-     * @return
-     */
-    protected abstract Map<Long,TimelineMetrics> getMetricsFromCache();
-
-    /**
-     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
-     * @param metricValues
-     * @return
-     */
-    protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues);
-
-    protected void publishMetricsJson(String jsonData) throws Exception {
-        int timeout = 5 * 1000;
-        HttpURLConnection connection = null;
-        if (this.publishURL == null) {
-            throw new IOException("Unknown URL. Unable to connect to metrics collector.");
-        }
-        LOG.info("Collector URL : " + publishURL);
-        connection = (HttpURLConnection) new URL(this.publishURL).openConnection();
-
-        connection.setRequestMethod("POST");
-        connection.setRequestProperty("Content-Type", "application/json");
-        connection.setRequestProperty("Connection", "Keep-Alive");
-        connection.setConnectTimeout(timeout);
-        connection.setReadTimeout(timeout);
-        connection.setDoOutput(true);
-
-        if (jsonData != null) {
-            try (OutputStream os = connection.getOutputStream()) {
-                os.write(jsonData.getBytes("UTF-8"));
-            }
-        }
-        int responseCode = connection.getResponseCode();
-        if (responseCode != 200) {
-            throw new Exception("responseCode is " + responseCode);
-        }
-        LOG.info("Successfully sent metrics.");
-    }
-
-    /**
-     * Interrupts the thread.
-     */
-    protected void stopPublisher() {
-        this.interrupt();
-    }
-}

+ 62 - 36
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java

@@ -33,6 +33,9 @@ import java.util.HashMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
 
 /**
  * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
@@ -40,24 +43,25 @@ import org.apache.hadoop.conf.Configuration;
 public class AggregatorApplication
 {
     private static final int STOP_SECONDS_DELAY = 0;
-    private static final int JOIN_SECONDS_TIMEOUT = 2;
-    private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
-    private static String AGGREGATED_POST_PREFIX = "/aggregated";
+    private static final int JOIN_SECONDS_TIMEOUT = 5;
     private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
-    private static Log LOG = LogFactory.getLog("AggregatorApplication.class");
+    private Log LOG;
     private final int webApplicationPort;
     private final int rawPublishingInterval;
     private final int aggregationInterval;
     private Configuration configuration;
-    private String [] collectorHosts;
-    private AggregatedMetricsPublisher aggregatePublisher;
-    private RawMetricsPublisher rawPublisher;
+    private Thread aggregatePublisherThread;
+    private Thread rawPublisherThread;
     private TimelineMetricsHolder timelineMetricsHolder;
     private HttpServer httpServer;
 
-    public AggregatorApplication(String collectorHosts) {
+    public AggregatorApplication(String hostname, String collectorHosts) {
+        LOG = LogFactory.getLog(this.getClass());
+        configuration = new Configuration(true);
         initConfiguration();
-        this.collectorHosts = collectorHosts.split(",");
+        configuration.set("timeline.metrics.collector.hosts", collectorHosts);
+        configuration.set("timeline.metrics.hostname", hostname);
+        configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
         this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
         this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
         this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
@@ -70,7 +74,13 @@ public class AggregatorApplication
         }
     }
 
-    private void initConfiguration() {
+    private String getZkQuorumFromConfiguration() {
+        String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
+        String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
+        return getZkConnectionUrl(zkClientPort, zkServerHosts);
+    }
+
+    protected void initConfiguration() {
         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         if (classLoader == null) {
             classLoader = getClass().getClassLoader();
@@ -82,7 +92,7 @@ public class AggregatorApplication
             throw new IllegalStateException("Unable to initialize the metrics " +
                     "subsystem. No ams-site present in the classpath.");
         }
-        configuration = new Configuration(true);
+
         try {
             configuration.addResource(amsResUrl.toURI().toURL());
         } catch (Exception e) {
@@ -91,7 +101,7 @@ public class AggregatorApplication
         }
     }
 
-    private String getHostName() {
+    protected String getHostName() {
         String hostName = "localhost";
         try {
             hostName = InetAddress.getLocalHost().getCanonicalHostName();
@@ -101,13 +111,13 @@ public class AggregatorApplication
         return hostName;
     }
 
-    private URI getURI() {
+    protected URI getURI() {
         URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
         LOG.info(String.format("Web server at %s", uri));
         return uri;
     }
 
-    private HttpServer createHttpServer() throws IOException {
+    protected HttpServer createHttpServer() throws IOException {
         ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
         HashMap<String, Object> params = new HashMap();
         params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
@@ -122,29 +132,30 @@ public class AggregatorApplication
 
     private void startAggregatePublisherThread() {
         LOG.info("Starting aggregated metrics publisher.");
-        String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX;
-        aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval);
-        aggregatePublisher.start();
+        AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
+        aggregatePublisherThread = new Thread(metricPublisher);
+        aggregatePublisherThread.start();
     }
 
     private void startRawPublisherThread() {
         LOG.info("Starting raw metrics publisher.");
-        String collectorURL = buildBasicCollectorURL(collectorHosts[0]);
-        rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval);
-        rawPublisher.start();
+        AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
+        rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
+        aggregatePublisherThread.start();
     }
 
 
 
     private void stop() {
-        aggregatePublisher.stopPublisher();
-        rawPublisher.stopPublisher();
+        LOG.info("Stopping aggregator application");
+        aggregatePublisherThread.interrupt();
+        rawPublisherThread.interrupt();
         httpServer.stop(STOP_SECONDS_DELAY);
         LOG.info("Stopped web server.");
         try {
             LOG.info("Waiting for threads to join.");
-            aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
-            rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000);
+            aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
+            rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
             LOG.info("Gracefully stopped Aggregator Application.");
         } catch (InterruptedException e) {
             LOG.error("Received exception during stop : ", e);
@@ -153,28 +164,43 @@ public class AggregatorApplication
 
     }
 
-    private String buildBasicCollectorURL(String host) {
-        String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1];
-        String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
-        return String.format(BASE_POST_URL, protocol, host, port);
+    private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+        StringBuilder sb = new StringBuilder();
+        String[] quorumParts = zkQuorum.split(",");
+        String prefix = "";
+        for (String part : quorumParts) {
+            sb.append(prefix);
+            sb.append(part.trim());
+            if (!part.contains(":")) {
+                sb.append(":");
+                sb.append(zkClientPort);
+            }
+            prefix = ",";
+        }
+        return sb.toString();
     }
 
     public static void main( String[] args ) throws Exception {
-        LOG.info("Starting aggregator application");
-        if (args.length != 1) {
-            throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma");
+        if (args.length != 2) {
+            throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
+                    "2nd - collector hosts separated with coma");
         }
 
-        final AggregatorApplication app = new AggregatorApplication(args[0]);
-        app.startAggregatePublisherThread();
-        app.startRawPublisherThread();
-        app.startWebServer();
+        final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
+
+        app.startWebServerAndPublishersThreads();
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
-                LOG.info("Stopping aggregator application");
                 app.stop();
             }
         });
     }
+
+    private void startWebServerAndPublishersThreads() {
+        LOG.info("Starting aggregator application");
+        startAggregatePublisherThread();
+        startRawPublisherThread();
+        startWebServer();
+    }
 }

+ 1 - 1
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java

@@ -39,7 +39,7 @@ public class AggregatorWebService {
     @GET
     @Produces("text/json")
     @Path("/metrics")
-    public Response helloWorld() throws IOException {
+    public Response getOkResponse() throws IOException {
         return Response.ok().build();
     }
 

+ 18 - 8
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java

@@ -19,8 +19,10 @@ package org.apache.hadoop.metrics2.host.aggregator;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
@@ -33,8 +35,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class TimelineMetricsHolder {
     private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
     private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
-    private Cache<Long, TimelineMetrics> aggregationMetricsCache;
-    private Cache<Long, TimelineMetrics> rawMetricsCache;
+    private Cache<String, TimelineMetrics> aggregationMetricsCache;
+    private Cache<String, TimelineMetrics> rawMetricsCache;
     private static TimelineMetricsHolder instance = null;
     //to ensure no metric values are expired
     private static int EXPIRE_DELAY = 30;
@@ -63,21 +65,29 @@ public class TimelineMetricsHolder {
 
     public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
         aggregationCacheLock.writeLock().lock();
-        aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics);
+        aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics);
         aggregationCacheLock.writeLock().unlock();
     }
 
-    public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() {
+    private String calculateCacheKey(TimelineMetrics timelineMetrics) {
+        List<TimelineMetric>  metrics =  timelineMetrics.getMetrics();
+        if (metrics.size() > 0) {
+            return  metrics.get(0).getAppId() + System.currentTimeMillis();
+        }
+        return String.valueOf(System.currentTimeMillis());
+    }
+
+    public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() {
         return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
     }
 
     public void putMetricsForRawPublishing(TimelineMetrics metrics) {
         rawCacheLock.writeLock().lock();
-        rawMetricsCache.put(System.currentTimeMillis(), metrics);
+        rawMetricsCache.put(calculateCacheKey(metrics), metrics);
         rawCacheLock.writeLock().unlock();
     }
 
-    public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() {
+    public Map<String, TimelineMetrics> extractMetricsForRawPublishing() {
         return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
     }
 
@@ -87,9 +97,9 @@ public class TimelineMetricsHolder {
      * @param lock
      * @return
      */
-    private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) {
+    private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) {
         lock.writeLock().lock();
-        Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+        Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
         cache.invalidateAll();
         lock.writeLock().unlock();
         return metricsMap;

+ 169 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java

@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
+
+    private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
+    private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
+    private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
+    private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
+    private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
+    private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
+    private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum";
+    private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname";
+    protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+    protected int publishIntervalInSeconds;
+    private Log LOG;
+    protected TimelineMetricsHolder timelineMetricsHolder;
+    protected Configuration configuration;
+    private String collectorProtocol;
+    private String collectorPort;
+    private Collection<String> collectorHosts;
+    private String hostname;
+    private String zkQuorum;
+
+    public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) {
+        LOG = LogFactory.getLog(this.getClass());
+        this.configuration = configuration;
+        this.publishIntervalInSeconds = publishIntervalInSeconds;
+        this.timelineMetricsHolder = timelineMetricsHolder;
+        configure();
+    }
+
+    protected void configure() {
+        collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+        collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1];
+        collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+        zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, "");
+        hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost");
+        collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+        if (collectorHosts.isEmpty()) {
+            LOG.error("No Metric collector configured.");
+        } else {
+            if (collectorProtocol.contains("https")) {
+                String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
+                String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
+                String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+                loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+            }
+        }
+    }
+
+    /**
+     * Publishes metrics to collector in specified intervals while not interrupted.
+     */
+    @Override
+    public void run() {
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                Thread.sleep(this.publishIntervalInSeconds * 1000);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            try {
+                processAndPublishMetrics(getMetricsFromCache());
+            } catch (Exception e) {
+                //ignore
+            }
+        }
+    }
+
+    /**
+     * Processes and sends metrics to collector.
+     * @param metricsFromCache
+     * @throws Exception
+     */
+    protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception {
+        if (metricsFromCache.size()==0) return;
+
+        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+        emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache));
+    }
+
+    /**
+     * Returns metrics map. Source is based on implementation.
+     * @return
+     */
+    protected abstract Map<String,TimelineMetrics> getMetricsFromCache();
+
+    /**
+     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+     * @param metricValues
+     * @return
+     */
+    protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues);
+
+    protected abstract String getPostUrl();
+
+    @Override
+    protected String getCollectorUri(String host) {
+        return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort());
+    }
+
+    @Override
+    protected String getCollectorProtocol() {
+        return collectorProtocol;
+    }
+
+    @Override
+    protected String getCollectorPort() {
+        return collectorPort;
+    }
+
+    @Override
+    protected int getTimeoutSeconds() {
+        return DEFAULT_POST_TIMEOUT_SECONDS;
+    }
+
+    @Override
+    protected String getZookeeperQuorum() {
+        return zkQuorum;
+    }
+
+    @Override
+    protected Collection<String> getConfiguredCollectorHosts() {
+        return collectorHosts;
+    }
+
+    @Override
+    protected String getHostname() {
+        return hostname;
+    }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+        return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+        return 0;
+    }
+}

+ 15 - 13
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java → ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java

@@ -15,17 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.metrics2.host.aggregator;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,12 +33,12 @@ import java.util.TreeMap;
 /**
  * Thread that aggregates and publishes metrics to collector on specified interval.
  */
-public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
-
+public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
+    private static String AGGREGATED_POST_PREFIX = "/aggregated";
     private Log LOG;
 
-    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
-        super(timelineMetricsHolder, collectorURL, interval);
+    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+        super(timelineMetricsHolder, configuration, interval);
         LOG = LogFactory.getLog(this.getClass());
     }
 
@@ -50,7 +47,7 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
      * @return
      */
     @Override
-    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+    protected Map<String, TimelineMetrics> getMetricsFromCache() {
         return timelineMetricsHolder.extractMetricsForAggregationPublishing();
     }
 
@@ -60,7 +57,7 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
      * @return
      */
     @Override
-    protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) {
+    protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) {
         HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
         for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
             for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
@@ -90,7 +87,7 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
         }
         String json = null;
         try {
-            json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+            json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
             LOG.debug(json);
         } catch (Exception e) {
             LOG.error("Failed to convert result into json", e);
@@ -98,4 +95,9 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread {
 
         return json;
     }
+
+    @Override
+    protected String getPostUrl() {
+        return BASE_POST_URL + AGGREGATED_POST_PREFIX;
+    }
 }

+ 14 - 9
ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java → ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java

@@ -15,32 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.metrics2.host.aggregator;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
 
 import java.util.Map;
 
-public class RawMetricsPublisher extends AbstractMetricPublisherThread {
+public class RawMetricsPublisher extends AbstractMetricPublisher {
     private final Log LOG;
 
-    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) {
-        super(timelineMetricsHolder, collectorURL, interval);
+    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+        super(timelineMetricsHolder, configuration, interval);
         LOG = LogFactory.getLog(this.getClass());
     }
 
 
     @Override
-    protected Map<Long, TimelineMetrics> getMetricsFromCache() {
+    protected Map<String, TimelineMetrics> getMetricsFromCache() {
         return timelineMetricsHolder.extractMetricsForRawPublishing();
     }
 
     @Override
-    protected String processMetrics(Map<Long, TimelineMetrics> metricValues) {
+    protected String processMetrics(Map<String, TimelineMetrics> metricValues) {
         //merge everything in one TimelineMetrics object
         TimelineMetrics timelineMetrics = new TimelineMetrics();
         for (TimelineMetrics metrics : metricValues.values()) {
@@ -50,11 +50,16 @@ public class RawMetricsPublisher extends AbstractMetricPublisherThread {
         //map TimelineMetrics to json string
         String json = null;
         try {
-            json = objectMapper.writeValueAsString(timelineMetrics);
+            json = mapper.writeValueAsString(timelineMetrics);
             LOG.debug(json);
         } catch (Exception e) {
             LOG.error("Failed to convert result into json", e);
         }
         return json;
     }
+
+    @Override
+    protected String getPostUrl() {
+        return BASE_POST_URL;
+    }
 }

+ 55 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.easymock.EasyMock.createMockBuilder;
+
+
+public class AggregatorApplicationTest {
+    @Test
+    public void testMainNotEnoughArguments() {
+        try {
+            AggregatorApplication.main(new String[0]);
+            throw new Exception("Should not be thrown");
+        } catch (Exception e) {
+            //expected
+        }
+        try {
+            AggregatorApplication.main(new String[1]);
+            throw new Exception("Should not be thrown");
+        } catch (Exception e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testGetURI() {
+        AggregatorApplication aggregatorApplicationMock = createMockBuilder(AggregatorApplication.class)
+                .withConstructor("", "")
+                .addMockedMethod("createHttpServer")
+                .addMockedMethod("initConfiguration").createMock();
+
+        URI uri = aggregatorApplicationMock.getURI();
+        Assert.assertEquals("http://" + aggregatorApplicationMock.getHostName() + ":61888/", uri.toString());
+    }
+}

+ 135 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java

@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.test.framework.JerseyTest;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import com.sun.jersey.test.framework.spi.container.TestContainerFactory;
+import com.sun.jersey.test.framework.spi.container.grizzly2.GrizzlyTestContainerFactory;
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+import org.junit.Test;
+
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class AggregatorWebServiceTest extends JerseyTest {
+    public AggregatorWebServiceTest() {
+        super(new WebAppDescriptor.Builder(
+                "org.apache.hadoop.metrics2.host.aggregator")
+                .contextPath("jersey-guice-filter")
+                .servletPath("/")
+                .clientConfig(new DefaultClientConfig(JacksonJaxbJsonProvider.class))
+                .build());
+    }
+
+    @Override
+    public TestContainerFactory getTestContainerFactory() {
+        return new GrizzlyTestContainerFactory();
+    }
+
+    @Test
+    public void testOkResponse() {
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+                .accept("text/json")
+                .get(ClientResponse.class);
+        assertEquals(200, response.getStatus());
+        assertEquals("text/json", response.getType().toString());
+    }
+
+    @Test
+    public void testWrongPath() {
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics").path("aggregated")
+                .accept("text/json")
+                .get(ClientResponse.class);
+        assertEquals(404, response.getStatus());
+    }
+
+
+    @Test
+    public void testMetricsPost() {
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TimelineMetrics timelineMetrics = TimelineMetricsHolderTest.getTimelineMetricsWithAppID("appid");
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+                .accept(MediaType.TEXT_PLAIN)
+                .post(ClientResponse.class, timelineMetrics);
+        assertEquals(200, response.getStatus());
+        assertEquals(MediaType.TEXT_PLAIN, response.getType().toString());
+
+        Map<String, TimelineMetrics> aggregationMap =  timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        Map<String, TimelineMetrics> rawMap =  timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        Assert.assertEquals(1, aggregationMap.size());
+        Assert.assertEquals(1, rawMap.size());
+
+        Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+        Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+        Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+        Collection<String> rawCollectionKeys = rawMap.keySet();
+
+        for (String key : aggregationCollectionKeys) {
+            Assert.assertTrue(key.contains("appid"));
+        }
+
+        for (String key : rawCollectionKeys) {
+            Assert.assertTrue(key.contains("appid"));
+        }
+
+        Assert.assertEquals(1, aggregationCollection.size());
+        Assert.assertEquals(1, rawCollection.size());
+
+        TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+        TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+        Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+        Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+        Assert.assertEquals("appid", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+        Assert.assertEquals("appid", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+        aggregationMap =  timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        rawMap =  timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        //Cache should be empty after extraction
+        Assert.assertEquals(0, aggregationMap.size());
+        Assert.assertEquals(0, rawMap.size());
+    }
+
+
+}

+ 107 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+
+
+public class TimelineMetricsHolderTest {
+    private TimelineMetricsHolder timelineMetricsHolderInstance;
+
+    public void clearHolderSingleton() throws NoSuchFieldException, IllegalAccessException {
+        Class timelineMetricHolderClass = TimelineMetricsHolder.class;
+        Field field = timelineMetricHolderClass.getDeclaredField("instance");
+        field.setAccessible(true);
+        field.set(field, null);
+    }
+
+    @Test
+    public void testGetInstanceDefaultValues() throws Exception {
+        clearHolderSingleton();
+        Assert.assertNotNull(TimelineMetricsHolder.getInstance());
+    }
+
+    @Test
+    public void testGetInstanceWithParameters() throws Exception {
+        clearHolderSingleton();
+        Assert.assertNotNull(TimelineMetricsHolder.getInstance(1,2));
+    }
+
+    @Test
+    public void testCache() throws Exception {
+        clearHolderSingleton();
+        timelineMetricsHolderInstance = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolderInstance.putMetricsForAggregationPublishing(getTimelineMetricsWithAppID("aggr"));
+        timelineMetricsHolderInstance.putMetricsForRawPublishing(getTimelineMetricsWithAppID("raw"));
+
+        Map<String, TimelineMetrics> aggregationMap =  timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+        Map<String, TimelineMetrics> rawMap =  timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+        Assert.assertEquals(1, aggregationMap.size());
+        Assert.assertEquals(1, rawMap.size());
+
+        Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+        Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+        Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+        Collection<String> rawCollectionKeys = rawMap.keySet();
+
+        for (String key : aggregationCollectionKeys) {
+            Assert.assertTrue(key.contains("aggr"));
+        }
+
+        for (String key : rawCollectionKeys) {
+            Assert.assertTrue(key.contains("raw"));
+        }
+
+        Assert.assertEquals(1, aggregationCollection.size());
+        Assert.assertEquals(1, rawCollection.size());
+
+        TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+        TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+        Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+        Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+        Assert.assertEquals("aggr", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+        Assert.assertEquals("raw", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+        aggregationMap =  timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+        rawMap =  timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+        //Cache should be empty after extraction
+        Assert.assertEquals(0, aggregationMap.size());
+        Assert.assertEquals(0, rawMap.size());
+    }
+
+    public static TimelineMetrics getTimelineMetricsWithAppID(String appId) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setAppId(appId);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}

+ 82 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java

@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class AbstractMetricPublisherTest {
+    @Test
+    public void testProcessAndPublishMetrics() throws Exception {
+        AbstractMetricPublisher publisherMock =
+                createMockBuilder(RawMetricsPublisher.class)
+                        .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 60)
+                        .addMockedMethod("processMetrics")
+                        .addMockedMethod("getCollectorUri")
+                        .addMockedMethod("emitMetricsJson")
+                        .addMockedMethod("getCurrentCollectorHost").createStrictMock();
+
+        TimelineMetricsHolder.getInstance().putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+        expect(publisherMock.getCurrentCollectorHost()).andReturn("collectorhost").once();
+        expect(publisherMock.getCollectorUri(anyString())).andReturn("https://collectorhost:11/metrics").once();
+        expect(publisherMock.processMetrics(anyObject(Map.class))).andReturn("{metrics}").once();
+        expect(publisherMock.emitMetricsJson("https://collectorhost:11/metrics", "{metrics}")).andReturn(true).once();
+
+        replay(publisherMock);
+
+        publisherMock.processAndPublishMetrics(TimelineMetricsHolder.getInstance().extractMetricsForRawPublishing());
+
+        verify(publisherMock);
+    }
+
+    @Test
+    public void testRunAndStop() throws Exception {
+        AbstractMetricPublisher publisherMock = createMockBuilder(RawMetricsPublisher.class)
+                .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 1)
+                .addMockedMethod("processAndPublishMetrics").createStrictMock();
+        publisherMock.processAndPublishMetrics(anyObject(Map.class));
+        expectLastCall().times(1);
+
+
+        Thread t = createMockBuilder(Thread.class)
+                .withConstructor(publisherMock)
+                .addMockedMethod("isInterrupted").createStrictMock();
+        expect(t.isInterrupted()).andReturn(false).once();
+        expect(t.isInterrupted()).andReturn(true).once();
+
+        replay(publisherMock, t);
+
+        t.start();
+
+        Thread.sleep(2222);
+
+        verify(publisherMock, t);
+    }
+}

+ 154 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java

@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class AggregatedMetricsPublisherTest {
+
+    @Test
+    public void testProcessMetrics() throws Exception {
+        Configuration configuration = new Configuration();
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+        metric1App1Metrics.put(1L, 1d);
+        metric1App1Metrics.put(2L, 2d);
+        metric1App1Metrics.put(3L, 3d);
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+        TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+        metric2App2Metrics.put(1L, 4d);
+        metric2App2Metrics.put(2L, 5d);
+        metric2App2Metrics.put(3L, 6d);
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+        TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+        metric3App3Metrics.put(1L, 7d);
+        metric3App3Metrics.put(2L, 8d);
+        metric3App3Metrics.put(3L, 9d);
+
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+        String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing());
+        String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
+        String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
+        String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
+        Assert.assertNotNull(aggregatedJson);
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1));
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3));
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2));
+    }
+
+    @Test
+    public void testGetPostUrl() {
+        Configuration configuration = new Configuration();
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = aggregatedMetricsPublisher.getPostUrl();
+        String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetCollectorUri() {
+        //default configuration
+        Configuration configuration = new Configuration();
+        AbstractMetricPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+        String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //https configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+        aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+        expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //custom port configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+        aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+        expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetMetricsFromCache() throws InterruptedException {
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1"));
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2"));
+
+        Configuration configuration = new Configuration();
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+        Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache();
+        Assert.assertNotNull(metricsFromCache);
+        Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+        Assert.assertNotNull(actualTimelineMetrics);
+        Assert.assertEquals(2, actualTimelineMetrics.size());
+
+        for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+            List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+            Assert.assertEquals(1, metrics.size());
+            Assert.assertTrue(metrics.get(0).getAppId().contains("aggr"));
+        }
+
+    }
+
+    TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setMetricValues(metricValues);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}

+ 151 - 0
ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java

@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class RawMetricsPublisherTest {
+    @Test
+    public void testProcessMetrics() throws Exception {
+        Configuration configuration = new Configuration();
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+        metric1App1Metrics.put(1L, 1d);
+        metric1App1Metrics.put(2L, 2d);
+        metric1App1Metrics.put(3L, 3d);
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+        TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+        metric2App2Metrics.put(1L, 4d);
+        metric2App2Metrics.put(2L, 5d);
+        metric2App2Metrics.put(3L, 6d);
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+        TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+        metric3App3Metrics.put(1L, 7d);
+        metric3App3Metrics.put(2L, 8d);
+        metric3App3Metrics.put(3L, 9d);
+
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+        String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing());
+        String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}";
+        Assert.assertNotNull(rawJson);
+        Assert.assertEquals(expectedResult, rawJson);
+    }
+
+    @Test
+    public void testGetPostUrl() {
+        Configuration configuration = new Configuration();
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = rawMetricsPublisher.getPostUrl();
+        String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetCollectorUri() {
+        //default configuration
+        Configuration configuration = new Configuration();
+        AbstractMetricPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = rawMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+        String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //https configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+        rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = rawMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+        expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //custom port configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+        rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = rawMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+        expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetMetricsFromCache() throws InterruptedException {
+
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw1"));
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr"));
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw2"));
+
+        Configuration configuration = new Configuration();
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+        Map<String, TimelineMetrics> metricsFromCache = rawMetricsPublisher.getMetricsFromCache();
+        Assert.assertNotNull(metricsFromCache);
+        Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+        Assert.assertNotNull(actualTimelineMetrics);
+        Assert.assertEquals(2, actualTimelineMetrics.size());
+
+        for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+            List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+            Assert.assertEquals(1, metrics.size());
+            Assert.assertTrue(metrics.get(0).getAppId().contains("raw"));
+        }
+
+    }
+
+    TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setMetricValues(metricValues);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}

+ 4 - 2
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py

@@ -42,9 +42,10 @@ class Aggregator(threading.Thread):
     ams_log_file = "ambari-metrics-aggregator.log"
     additional_classpath = ':{0}'.format(config_dir)
     ams_log_dir = self._config.ams_monitor_log_dir()
+    hostname = self._config.get_hostname_config()
     logger.info('Starting Aggregator thread.')
-    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\
-      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts)
+    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6} {7}"\
+      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, hostname, collector_hosts)
 
     logger.info("Executing : {0}".format(cmd))
 
@@ -60,6 +61,7 @@ class Aggregator(threading.Thread):
     if self._aggregator_process :
       logger.info('Stopping Aggregator thread.')
       self._aggregator_process.terminate()
+      self._aggregator_process = None
 
 class AggregatorWatchdog(threading.Thread):
   SLEEP_TIME = 30

+ 1 - 1
ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py

@@ -138,7 +138,7 @@ class Controller(threading.Thread):
     if self.aggregator:
       self.aggregator.stop()
     if self.aggregator_watchdog:
-      self.aggregator.stop()
+      self.aggregator_watchdog.stop()
     self.aggregator = Aggregator(self.config, self._stop_handler)
     self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler)
     self.aggregator.start()