Просмотр исходного кода

AMBARI-16828. Support round-robin scheduling with failover for Sinks with distributed collector. (swagle)

Aravindan Vijayan 9 лет назад
Родитель
Сommit
954e61ee07
40 измененных файлов с 1223 добавлено и 145 удалено
  1. 22 1
      ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java
  2. 22 0
      ambari-metrics/ambari-metrics-common/pom.xml
  3. 258 14
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
  4. 25 0
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java
  5. 96 0
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
  6. 24 0
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java
  7. 59 0
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
  8. 24 0
      ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java
  9. 149 0
      ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
  10. 52 0
      ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java
  11. 31 7
      ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
  12. 35 4
      ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
  13. 37 13
      ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
  14. 47 8
      ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
  15. 41 9
      ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
  16. 1 0
      ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
  17. 37 7
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
  18. 35 3
      ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
  19. 47 12
      ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
  20. 35 3
      ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
  21. 12 17
      ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
  22. 17 5
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
  23. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
  24. 8 8
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
  25. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
  26. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
  27. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
  28. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
  29. 2 2
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
  30. 3 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
  31. 3 3
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
  32. 8 0
      ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
  33. 6 6
      ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
  34. 20 0
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
  35. 4 1
      ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
  36. 3 1
      ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
  37. 15 0
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
  38. 4 1
      ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
  39. 21 0
      ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
  40. 10 7
      ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2

+ 22 - 1
ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java

@@ -22,6 +22,7 @@ package org.apache.ambari.logsearch.solr.metrics;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
+// TODO: Refactor for failover
 public class SolrAmsClient extends AbstractTimelineMetricsSink {
   private final String collectorHost;
 
@@ -30,7 +31,7 @@ public class SolrAmsClient extends AbstractTimelineMetricsSink {
   }
 
   @Override
-  public String getCollectorUri() {
+  public String getCollectorUri(String host) {
     return collectorHost;
   }
 
@@ -39,8 +40,28 @@ public class SolrAmsClient extends AbstractTimelineMetricsSink {
     return 10;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return null;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return null;
+  }
+
+  @Override
+  protected String getHostname() {
+    return null;
+  }
+
   @Override
   protected boolean emitMetrics(TimelineMetrics metrics) {
     return super.emitMetrics(metrics);
   }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return null;
+  }
 }

+ 22 - 0
ambari-metrics/ambari-metrics-common/pom.xml

@@ -62,6 +62,28 @@
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>
     </dependency>
+    <!-- TODO: Need to add these as shaded dependencies -->
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>14.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>2.7.1</version>
+    </dependency>
+    <!--  END TODO -->
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-xc</artifactId>

+ 258 - 14
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -17,8 +17,18 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
+import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException;
+import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy;
+import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy;
 import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -35,9 +45,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.security.KeyStore;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class AbstractTimelineMetricsSink {
@@ -46,20 +64,22 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String METRICS_SEND_INTERVAL = "sendInterval";
   public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout";
   public static final String COLLECTOR_PROPERTY = "collector";
+  public static final String COLLECTOR_PROTOCOL = "protocol";
+  public static final String COLLECTOR_PORT = "port";
+  public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
   public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
   public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative";
   public static final String RPC_METRIC_PREFIX = "metric.rpc";
-  public static final String RPC_METRIC_NAME_SUFFIX = "suffix";
-  public static final String RPC_METRIC_PORT_SUFFIX = "port";
-
   public static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
-
   public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
   public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+  public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
 
   protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
+  public int ZK_CONNECT_TRY_TIME = 10000;
+  public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000;
 
   private SSLSocketFactory sslSocketFactory;
 
@@ -67,6 +87,28 @@ public abstract class AbstractTimelineMetricsSink {
 
   protected static ObjectMapper mapper;
 
+  protected MetricCollectorHAHelper collectorHAHelper;
+
+  protected MetricSinkWriteShardStrategy metricSinkWriteShardStrategy;
+
+  // Single element cache with fixed expiration - Helps adjacent Sinks as
+  // well as timed refresh
+  protected Supplier targetCollectorHostSupplier;
+
+  protected final List<String> allKnownLiveCollectors = new ArrayList<>();
+
+  private volatile boolean isInitializedForHA = false;
+
+  @SuppressWarnings("all")
+  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5;
+
+  private final Gson gson = new Gson();
+
+  private final Random rand = new Random();
+
+  private static final int COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES = 75;
+  private static final int COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES = 60;
+
   static {
     mapper = new ObjectMapper();
     AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
@@ -79,6 +121,16 @@ public abstract class AbstractTimelineMetricsSink {
     LOG = LogFactory.getLog(this.getClass());
   }
 
+  /**
+   * Initialize Sink write strategy with respect to HA Collector
+   */
+  protected void init() {
+    metricSinkWriteShardStrategy = new MetricSinkWriteShardHostnameHashingStrategy(getHostname());
+    collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(),
+      ZK_CONNECT_TRY_TIME, ZK_SLEEP_BETWEEN_RETRY_TIME);
+    isInitializedForHA = true;
+  }
+
   protected boolean emitMetricsJson(String connectUrl, String jsonData) {
     int timeout = getTimeoutSeconds() * 1000;
     HttpURLConnection connection = null;
@@ -113,7 +165,7 @@ public abstract class AbstractTimelineMetricsSink {
         }
       }
       cleanupInputStream(connection.getInputStream());
-      //reset failedCollectorConnectionsCounter to "0"
+      // reset failedCollectorConnectionsCounter to "0"
       failedCollectorConnectionsCounter.set(0);
       return true;
     } catch (IOException ioe) {
@@ -146,7 +198,20 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   protected boolean emitMetrics(TimelineMetrics metrics) {
-    String connectUrl = getCollectorUri();
+    String collectorHost;
+    // Get cached target
+    if (targetCollectorHostSupplier != null) {
+      collectorHost = (String) targetCollectorHostSupplier.get();
+      // Last X attempts have failed - force refresh
+      if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) {
+        targetCollectorHostSupplier = null;
+        collectorHost = findPreferredCollectHost();
+      }
+    } else {
+      collectorHost = findPreferredCollectHost();
+    }
+
+    String connectUrl = getCollectorUri(collectorHost);
     String jsonData = null;
     try {
       jsonData = mapper.writeValueAsString(metrics);
@@ -196,8 +261,7 @@ public abstract class AbstractTimelineMetricsSink {
   protected HttpsURLConnection getSSLConnection(String spec)
     throws IOException, IllegalStateException {
 
-    HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec)
-      .openConnection());
+    HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec).openConnection());
 
     connection.setSSLSocketFactory(sslSocketFactory);
 
@@ -208,11 +272,7 @@ public abstract class AbstractTimelineMetricsSink {
                                 String trustStorePassword) {
     if (sslSocketFactory == null) {
       if (trustStorePath == null || trustStorePassword == null) {
-
-        String msg =
-          String.format("Can't load TrustStore. " +
-            "Truststore path or password is not set.");
-
+        String msg = "Can't load TrustStore. Truststore path or password is not set.";
         LOG.error(msg);
         throw new IllegalStateException(msg);
       }
@@ -242,7 +302,191 @@ public abstract class AbstractTimelineMetricsSink {
     }
   }
 
-  abstract protected String getCollectorUri();
+  /**
+   * Find appropriate write shard for this sink using the {@link org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy}
+   *
+   * 1. Use configured collector(s) to discover available collectors
+   * 2. If configured collector(s) are unresponsive check Zookeeper to find live hosts
+   * 3. Refresh known collector list using ZK
+   * 4. Default: Return configured collector with no side effect due to discovery.
+   *
+   * throws {#link MetricsSinkInitializationException} if called before
+   * initialization, not other side effect
+   *
+   * @return String Collector hostname
+   */
+  protected synchronized String findPreferredCollectHost() {
+    if (!isInitializedForHA) {
+      init();
+    }
+
+    // Auto expire and re-calculate after 1 hour
+    if (targetCollectorHostSupplier != null) {
+      Object targetCollector = targetCollectorHostSupplier.get();
+      if (targetCollector != null) {
+        return (String) targetCollector;
+      }
+    }
+
+    String configuredCollectors = getConfiguredCollectors();
+    // Reach out to all configured collectors before Zookeeper
+    if (configuredCollectors != null && !configuredCollectors.isEmpty()) {
+      String collectorHosts = getConfiguredCollectors();
+      if (!collectorHosts.isEmpty()) {
+        String[] hosts = collectorHosts.split(",");
+        for (String hostPortStr : hosts) {
+          if (hostPortStr != null && !hostPortStr.isEmpty()) {
+            String[] hostPortPair = hostPortStr.split(":");
+            if (hostPortPair.length < 2) {
+              LOG.warn("Collector port is missing from the configuration.");
+              continue;
+            }
+            String hostStr = hostPortPair[0].trim();
+            String portStr = hostPortPair[1].trim();
+            // Check liveliness and get known instances
+            try {
+              Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, portStr);
+              // Update live Hosts - current host will already be a part of this
+              for (String host : liveHosts) {
+                allKnownLiveCollectors.add(host);
+              }
+            } catch (MetricCollectorUnavailableException e) {
+              allKnownLiveCollectors.remove(hostStr);
+              LOG.info("Collector " + hostStr + " is not longer live. Removing " +
+                "it from list of know live collector hosts : " + allKnownLiveCollectors);
+            }
+          }
+        }
+      }
+    }
+
+    // Lookup Zookeeper for live hosts - max 10 seconds wait time
+    if (allKnownLiveCollectors.size() == 0) {
+      allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
+    }
+
+    if (allKnownLiveCollectors.size() != 0) {
+      targetCollectorHostSupplier = Suppliers.memoizeWithExpiration(
+        new Supplier() {
+          @Override
+          public Object get() {
+            return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors));
+          }
+        },  // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
+        rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES
+          - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1)
+          + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES,
+        TimeUnit.MINUTES
+      );
+
+      return (String) targetCollectorHostSupplier.get();
+    }
+    return null;
+  }
+
+  Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException {
+    List<String> collectors = new ArrayList<>();
+    HttpURLConnection connection = null;
+    StringBuilder sb = new StringBuilder(getCollectorProtocol());
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(COLLECTOR_LIVE_NODES_PATH);
+    String connectUrl = sb.toString();
+    LOG.debug("Requesting live collector nodes : " + connectUrl);
+    try {
+      connection = getCollectorProtocol().startsWith("https") ?
+        getSSLConnection(connectUrl) : getConnection(connectUrl);
+
+      connection.setRequestMethod("GET");
+      // 5 seconds for this op is plenty of wait time
+      connection.setConnectTimeout(3000);
+      connection.setReadTimeout(2000);
+
+      int responseCode = connection.getResponseCode();
+      if (responseCode == 200) {
+        try (InputStream in = connection.getInputStream()) {
+          StringWriter writer = new StringWriter();
+          IOUtils.copy(in, writer);
+          try {
+            collectors = gson.fromJson(writer.toString(), new TypeToken<List<String>>(){}.getType());
+          } catch (JsonSyntaxException jse) {
+            // Swallow this at the behest of still trying to POST
+            LOG.debug("Exception deserializing the json data on live " +
+              "collector nodes.", jse);
+          }
+        }
+      }
+
+    } catch (IOException ioe) {
+      StringBuilder errorMessage =
+        new StringBuilder("Unable to connect to collector, " + connectUrl);
+      try {
+        if ((connection != null)) {
+          errorMessage.append(cleanupInputStream(connection.getErrorStream()));
+        }
+      } catch (IOException e) {
+        //NOP
+      }
+      LOG.debug(errorMessage);
+      LOG.debug(ioe);
+      String warnMsg = "Unable to connect to collector to find live nodes.";
+      LOG.warn(warnMsg, ioe);
+      throw new MetricCollectorUnavailableException(warnMsg);
+    }
+    return collectors;
+  }
+
+  // Constructing without UriBuilder to avoid unfavorable httpclient
+  // dependencies
+  protected String constructTimelineMetricUri(String protocol, String host, String port) {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(WS_V1_TIMELINE_METRICS);
+    return sb.toString();
+  }
+
+  protected String constructContainerMetricUri(String protocol, String host, String port) {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(WS_V1_TIMELINE_METRICS);
+    return sb.toString();
+  }
 
+  /**
+   * Get a pre-formatted URI for the collector
+   */
+  abstract protected String getCollectorUri(String host);
+
+  abstract protected String getCollectorProtocol();
+
+  /**
+   * How soon to timeout on the emit calls in seconds.
+   */
   abstract protected int getTimeoutSeconds();
+
+  /**
+   * Get the zookeeper quorum for the cluster used to find collector
+   * @return String "host1:port1,host2:port2"
+   */
+  abstract protected String getZookeeperQuorum();
+
+  /**
+   * Get pre-configured list of collectors available
+   * @return String "host1:port,host2:port"
+   */
+  abstract protected String getConfiguredCollectors();
+
+  /**
+   * Get hostname used for calculating write shard.
+   * @return String "host1"
+   */
+  abstract protected String getHostname();
 }

+ 25 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java

@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+public class MetricsSinkInitializationException extends RuntimeException {
+  // Default constructor
+  public MetricsSinkInitializationException(String message) {
+    super(message);
+  }
+}

+ 96 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java

@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.retry.RetryUntilElapsed;
+import org.apache.zookeeper.ZooKeeper;
+
+import java.net.HttpURLConnection;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+/**
+ * Find a live Collector instance from Zookeeper
+ * This class allows connect to ZK on-demand and
+ * does not add a watcher on the znode.
+ */
+public class MetricCollectorHAHelper {
+  private final String zookeeperQuorum;
+  private final int tryTime;
+  private final int sleepMsBetweenRetries;
+
+  private static final int CONNECTION_TIMEOUT = 2000;
+  private static final int SESSION_TIMEOUT = 5000;
+  private static final String ZK_PATH = "/ambari-metrics-cluster/LIVEINSTANCES";
+  private static final String INSTANCE_NAME_DELIMITER = "_";
+
+
+
+  private static final Log LOG = LogFactory.getLog(MetricCollectorHAHelper.class);
+
+  public MetricCollectorHAHelper(String zookeeperQuorum, int tryTime, int sleepMsBetweenRetries) {
+    this.zookeeperQuorum = zookeeperQuorum;
+    this.tryTime = tryTime;
+    this.sleepMsBetweenRetries = sleepMsBetweenRetries;
+  }
+
+  /**
+   * Connect to Zookeeper to find live instances of metrics collector
+   * @return {#link Collection} hostnames
+   */
+  public Collection<String> findLiveCollectorHostsFromZNode() {
+    Set<String> collectors = new HashSet<>();
+
+    RetryPolicy retryPolicy = new RetryUntilElapsed(tryTime, sleepMsBetweenRetries);
+    final CuratorZookeeperClient client = new CuratorZookeeperClient(zookeeperQuorum,
+      SESSION_TIMEOUT, CONNECTION_TIMEOUT, null, retryPolicy);
+
+    String liveInstances = null;
+
+    try {
+      liveInstances = RetryLoop.callWithRetry(client, new Callable<String>() {
+        @Override
+        public String call() throws Exception {
+          ZooKeeper zookeeper = client.getZooKeeper();
+          byte[] data = zookeeper.getData(ZK_PATH, null, null);
+          return data != null ? new String(data) : null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.warn("Unable to connect to zookeeper.", e);
+      LOG.debug(e);
+    }
+
+    // [ambari-sid-3.c.pramod-thangali.internal_12001]
+    if (liveInstances != null && !liveInstances.isEmpty()) {
+      for (String instanceStr : liveInstances.split(",")) {
+        collectors.add(instanceStr.substring(0, instanceStr.indexOf(INSTANCE_NAME_DELIMITER)));
+      }
+    }
+
+    return collectors;
+  }
+}

+ 24 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+public class MetricCollectorUnavailableException extends Exception {
+  public MetricCollectorUnavailableException(String message) {
+    super(message);
+  }
+}

+ 59 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.util.List;
+
+/**
+ * Provides sharding based on hostname
+ */
+public class MetricSinkWriteShardHostnameHashingStrategy implements MetricSinkWriteShardStrategy {
+  private final String hostname;
+  private final long hostnameHash;
+  private static final Log LOG = LogFactory.getLog(MetricSinkWriteShardHostnameHashingStrategy.class);
+
+  public MetricSinkWriteShardHostnameHashingStrategy(String hostname) {
+    this.hostname = hostname;
+    this.hostnameHash = hostname != null ? computeHash(hostname) : 1000; // some constant
+  }
+
+  @Override
+  public String findCollectorShard(List<String> collectorHosts) {
+    int index = (int) (hostnameHash % collectorHosts.size());
+    String collectorHost = collectorHosts.get(index);
+    LOG.info(String.format("Calculated collector shard %s based on hostname: %s", collectorHost, hostname));
+    return collectorHost;
+  }
+
+  /**
+   * Compute consistent hash based on hostname which should give decently
+   * uniform distribution assuming hostname generally have a sequential
+   * numeric suffix.
+   */
+  long computeHash(String hostname) {
+    long h = 11987L; // prime
+    int len = hostname.length();
+
+    for (int i = 0; i < len; i++) {
+      h = 31 * h + hostname.charAt(i);
+    }
+    return h;
+  }
+}

+ 24 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java

@@ -0,0 +1,24 @@
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface MetricSinkWriteShardStrategy {
+  String findCollectorShard(List<String> collectorHosts);
+}

+ 149 - 0
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import com.google.gson.Gson;
+import junit.framework.Assert;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import static org.easymock.EasyMock.expect;
+import static org.powermock.api.easymock.PowerMock.createNiceMock;
+import static org.powermock.api.easymock.PowerMock.expectNew;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class, HttpURLConnection.class})
+public class MetricCollectorHATest {
+
+  @Test
+  public void findCollectorUsingZKTest() throws Exception {
+    InputStream is = createNiceMock(InputStream.class);
+    HttpURLConnection connection = createNiceMock(HttpURLConnection.class);
+    URL url = createNiceMock(URL.class);
+    MetricCollectorHAHelper haHelper = createNiceMock(MetricCollectorHAHelper.class);
+
+    expectNew(URL.class, "http://localhost:2181/ws/v1/timeline/metrics/livenodes").andReturn(url).anyTimes();
+    expect(url.openConnection()).andReturn(connection).anyTimes();
+    expect(connection.getInputStream()).andReturn(is).anyTimes();
+    expect(connection.getResponseCode()).andThrow(new IOException()).anyTimes();
+    expect(haHelper.findLiveCollectorHostsFromZNode()).andReturn(
+      new ArrayList<String>() {{ add("h2"); add("h3"); }});
+
+    replayAll();
+    TestTimelineMetricsSink sink = new TestTimelineMetricsSink(haHelper);
+    sink.init();
+
+    String host = sink.findPreferredCollectHost();
+
+    verifyAll();
+
+    Assert.assertNotNull(host);
+    Assert.assertEquals("h2", host);
+
+  }
+
+  @Test
+  public void findCollectorUsingKnownCollectorTest() throws Exception {
+    HttpURLConnection connection = createNiceMock(HttpURLConnection.class);
+    URL url = createNiceMock(URL.class);
+    MetricCollectorHAHelper haHelper = createNiceMock(MetricCollectorHAHelper.class);
+
+    Gson gson = new Gson();
+    ArrayList<String> output = new ArrayList<>();
+    output.add("h1");
+    output.add("h2");
+    output.add("h3");
+    InputStream is = IOUtils.toInputStream(gson.toJson(output));
+
+    expectNew(URL.class, "http://localhost:2181/ws/v1/timeline/metrics/livenodes").andReturn(url).anyTimes();
+    expect(url.openConnection()).andReturn(connection).anyTimes();
+    expect(connection.getInputStream()).andReturn(is).anyTimes();
+    expect(connection.getResponseCode()).andReturn(200).anyTimes();
+
+    replayAll();
+    TestTimelineMetricsSink sink = new TestTimelineMetricsSink(haHelper);
+    sink.init();
+
+    String host = sink.findPreferredCollectHost();
+    Assert.assertNotNull(host);
+    Assert.assertEquals("h3", host);
+
+    verifyAll();
+  }
+
+  private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
+    MetricCollectorHAHelper testHelper;
+
+    TestTimelineMetricsSink(MetricCollectorHAHelper haHelper) {
+      testHelper = haHelper;
+    }
+
+    @Override
+    protected void init() {
+      super.init();
+      this.collectorHAHelper = testHelper;
+    }
+
+    @Override
+    protected synchronized String findPreferredCollectHost() {
+      return super.findPreferredCollectHost();
+    }
+
+    @Override
+    protected String getCollectorUri(String host) {
+      return null;
+    }
+
+    @Override
+    protected String getCollectorProtocol() {
+      return "http";
+    }
+
+    @Override
+    protected int getTimeoutSeconds() {
+      return 10;
+    }
+
+    @Override
+    protected String getZookeeperQuorum() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getConfiguredCollectors() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getHostname() {
+      return "h1";
+    }
+  }
+}

+ 52 - 0
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ShardingStrategyTest {
+  @Test
+  public void testHostnameShardingStrategy() throws Exception {
+    List<String> collectorHosts = new ArrayList<String>() {{
+      add("mycollector-1.hostname.domain");
+      add("mycollector-2.hostname.domain");
+    }};
+
+    String hostname1 = "some-very-long-hostname-with-a-trailing-number-identifier-10.mylocalhost.domain";
+
+    // Consistency check
+    String collectorShard1 = null;
+    for (int i = 0; i < 100; i++) {
+      MetricSinkWriteShardStrategy strategy = new MetricSinkWriteShardHostnameHashingStrategy(hostname1);
+      collectorShard1 = strategy.findCollectorShard(collectorHosts);
+      Assert.assertEquals(collectorShard1, strategy.findCollectorShard(collectorHosts));
+    }
+
+    // Shard 2 hosts
+    String hostname2 = "some-very-long-hostname-with-a-trailing-number-identifier-20.mylocalhost.domain";
+    MetricSinkWriteShardStrategy strategy = new MetricSinkWriteShardHostnameHashingStrategy(hostname2);
+    String collectorShard2 = strategy.findCollectorShard(collectorHosts);
+
+    Assert.assertEquals("mycollector-1.hostname.domain", collectorShard1);
+    Assert.assertEquals("mycollector-2.hostname.domain", collectorShard2);
+  }
+}

+ 31 - 7
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java

@@ -33,14 +33,14 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.expect;
 import static org.powermock.api.easymock.PowerMock.createNiceMock;
 import static org.powermock.api.easymock.PowerMock.expectNew;
 import static org.powermock.api.easymock.PowerMock.replayAll;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class,
-  HttpURLConnection.class})
+@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class, HttpURLConnection.class})
 public class HandleConnectExceptionTest {
   private static final String COLLECTOR_URL = "collector";
   private TestTimelineMetricsSink sink;
@@ -53,7 +53,7 @@ public class HandleConnectExceptionTest {
     URL url = createNiceMock(URL.class);
     AbstractTimelineMetricsSink.NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 2;
     try {
-      expectNew(URL.class, "collector").andReturn(url).anyTimes();
+      expectNew(URL.class, anyString()).andReturn(url).anyTimes();
       expect(url.openConnection()).andReturn(connection).anyTimes();
       expect(connection.getOutputStream()).andReturn(os).anyTimes();
       expect(connection.getResponseCode()).andThrow(new IOException()).anyTimes();
@@ -79,27 +79,51 @@ public class HandleConnectExceptionTest {
     try{
       sink.emitMetrics(timelineMetrics);
       Assert.fail();
-    }catch(UnableToConnectException e){
+    } catch (UnableToConnectException e){
       Assert.assertEquals(COLLECTOR_URL, e.getConnectUrl());
-    }catch(Exception e){
+    } catch (Exception e){
+      e.printStackTrace();
       Assert.fail(e.getMessage());
     }
   }
 
-  class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
+  private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
     @Override
-    protected String getCollectorUri() {
+    protected String getCollectorUri(String host) {
       return COLLECTOR_URL;
     }
 
+    @Override
+    protected String getCollectorProtocol() {
+      return "http";
+    }
+
     @Override
     protected int getTimeoutSeconds() {
       return 10;
     }
 
+    @Override
+    protected String getZookeeperQuorum() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getConfiguredCollectors() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getHostname() {
+      return "h1";
+    }
+
     @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
+      super.init();
       return super.emitMetrics(metrics);
     }
+
+
   }
 }

+ 35 - 4
ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java

@@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
   private String collectorUri;
+  private String protocol;
   // Key - component(instance_id)
   private Map<String, TimelineMetricsCache> metricsCaches;
   private int maxRowCacheSize;
@@ -53,6 +54,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private ScheduledExecutorService scheduledExecutorService;
   private long pollFrequency;
   private String hostname;
+  private String port;
+  private String collectors;
+  private String zookeeperQuorum;
   private final static String COUNTER_METRICS_PROPERTY = "counters";
   private final Set<String> counterMetrics = new HashSet<String>();
   private int timeoutSeconds = 10;
@@ -95,8 +99,15 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
         String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
     metricsCaches = new HashMap<String, TimelineMetricsCache>();
-    collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS;
-    if (collectorUri.toLowerCase().startsWith("https://")) {
+    collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+    zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
+    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+    port = configuration.getProperty(COLLECTOR_PORT, "6188");
+    // Initialize the collector write strategy
+    super.init();
+
+    collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
+    if (protocol.contains("https")) {
       String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -109,8 +120,13 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   }
 
   @Override
-  public String getCollectorUri() {
-    return collectorUri;
+  public String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
   }
 
   @Override
@@ -118,6 +134,21 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zookeeperQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }

+ 37 - 13
ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java

@@ -28,14 +28,9 @@ import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.impl.MsInfo;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
-
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,9 +52,11 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   private TimelineMetricsCache metricsCache;
   private String hostName = "UNKNOWN.example.com";
   private String serviceName = "";
-  private List<? extends SocketAddress> metricsServers;
+  private String collectors;
   private String collectorUri;
   private String containerMetricsUri;
+  private String protocol;
+  private String port;
   public static final String WS_V1_CONTAINER_METRICS = "/ws/v1/timeline/containermetrics";
 
   private static final String SERVICE_NAME_PREFIX = "serviceName-prefix";
@@ -99,16 +96,21 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     serviceName = getServiceName(conf);
 
     LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName);
+    // Initialize the collector write strategy
+    super.init();
 
     // Load collector configs
-    metricsServers = Servers.parse(conf.getString(COLLECTOR_PROPERTY), 6188);
+    protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
+    collectors = conf.getString(COLLECTOR_PROPERTY, "").trim();
+    port = conf.getString(COLLECTOR_PORT, "6188");
 
-    if (metricsServers == null || metricsServers.isEmpty()) {
+    if (StringUtils.isEmpty(collectors)) {
       LOG.error("No Metric collector configured.");
     } else {
-      collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_TIMELINE_METRICS;
-      containerMetricsUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_CONTAINER_METRICS;
-      if (collectorUri.toLowerCase().startsWith("https://")) {
+      String preferredCollectorHost = findPreferredCollectHost();
+      collectorUri = constructTimelineMetricUri(protocol, preferredCollectorHost, port);
+      containerMetricsUri = constructContainerMetricUri(protocol, preferredCollectorHost, port);
+      if (protocol.contains("https")) {
         String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
         String trustStorePwd = conf.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -163,6 +165,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
         }
       }
     }
+
     if (!rpcPortSuffixes.isEmpty()) {
       LOG.info("RPC port properties configured: " + rpcPortSuffixes);
     }
@@ -190,8 +193,13 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   }
 
   @Override
-  protected String getCollectorUri() {
-    return collectorUri;
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
   }
 
   @Override
@@ -199,6 +207,21 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return conf.getString(ZOOKEEPER_QUORUM);
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostName;
+  }
+
   @Override
   public void putMetrics(MetricsRecord record) {
     try {
@@ -384,6 +407,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
       LOG.error("Unable to parse container metrics ", e);
     }
     if (jsonData != null) {
+      // TODO: Container metrics should be able to utilize failover mechanism
       emitMetricsJson(containerMetricsUri, jsonData);
     }
   }

+ 47 - 8
ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java

@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import com.google.gson.Gson;
 import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.MetricsInfo;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.easymock.EasyMock;
@@ -32,10 +35,13 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,11 +52,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PORT;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROPERTY;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROTOCOL;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.eq;
@@ -58,9 +67,14 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.powermock.api.easymock.PowerMock.expectNew;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
 
 @RunWith(PowerMockRunner.class)
+@PrepareForTest({AbstractTimelineMetricsSink.class, HttpURLConnection.class})
 public class HadoopTimelineMetricsSinkTest {
+  Gson gson = new Gson();
 
   @Before
   public void setup() {
@@ -68,16 +82,28 @@ public class HadoopTimelineMetricsSinkTest {
   }
 
   @Test
-  @PrepareForTest({URL.class, OutputStream.class})
+  @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class})
   public void testPutMetrics() throws Exception {
     HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
 
+    HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class);
+    URL url = PowerMock.createNiceMock(URL.class);
+    InputStream is = IOUtils.toInputStream(gson.toJson(Collections.singletonList("localhost")));
+    expectNew(URL.class, anyString()).andReturn(url).anyTimes();
+    expect(url.openConnection()).andReturn(connection).anyTimes();
+    expect(connection.getInputStream()).andReturn(is).anyTimes();
+    expect(connection.getResponseCode()).andReturn(200).anyTimes();
+    OutputStream os = PowerMock.createNiceMock(OutputStream.class);
+    expect(connection.getOutputStream()).andReturn(os).anyTimes();
+
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
-    expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
     expect(conf.getParent()).andReturn(null).anyTimes();
     expect(conf.getPrefix()).andReturn("service").anyTimes();
-    expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROPERTY), eq(""))).andReturn("localhost:6188").anyTimes();
     expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes();
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(1000).anyTimes();
@@ -121,6 +147,7 @@ public class HadoopTimelineMetricsSinkTest {
     expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
 
     replay(conf, record, metric);
+    replayAll();
 
     sink.init(conf);
 
@@ -130,7 +157,7 @@ public class HadoopTimelineMetricsSinkTest {
 
     sink.putMetrics(record);
 
-    verify(conf, record, metric);
+    verifyAll();
   }
 
   @Test
@@ -138,20 +165,26 @@ public class HadoopTimelineMetricsSinkTest {
     HadoopTimelineMetricsSink sink =
       createMockBuilder(HadoopTimelineMetricsSink.class)
         .withConstructor().addMockedMethod("appendPrefix")
+        .addMockedMethod("findLiveCollectorHostsFromKnownCollector")
         .addMockedMethod("emitMetrics").createNiceMock();
 
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
-    expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
     expect(conf.getParent()).andReturn(null).anyTimes();
     expect(conf.getPrefix()).andReturn("service").anyTimes();
-    expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROPERTY), eq(""))).andReturn("localhost:6188").anyTimes();
     expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes();
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     // Return eviction time smaller than time diff for first 3 entries
     // Third entry will result in eviction
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
 
+    expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188"))
+      .andReturn(Collections.singletonList("localhost")).anyTimes();
+
     conf.setListDelimiter(eq(','));
     expectLastCall().anyTimes();
 
@@ -260,14 +293,20 @@ public class HadoopTimelineMetricsSinkTest {
     HadoopTimelineMetricsSink sink =
       createMockBuilder(HadoopTimelineMetricsSink.class)
         .withConstructor().addMockedMethod("appendPrefix")
+        .addMockedMethod("findLiveCollectorHostsFromKnownCollector")
         .addMockedMethod("emitMetrics").createNiceMock();
 
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
-    expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
     expect(conf.getParent()).andReturn(null).anyTimes();
     expect(conf.getPrefix()).andReturn("service").anyTimes();
-    expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROPERTY), eq(""))).andReturn("localhost:6188").anyTimes();
     expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes();
+
+    expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188"))
+      .andReturn(Collections.singletonList("localhost")).anyTimes();
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();

+ 41 - 9
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -71,14 +71,18 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
 
-  private boolean initialized = false;
+  private volatile boolean initialized = false;
   private boolean running = false;
   private final Object lock = new Object();
   private String collectorUri;
   private String hostname;
+  private String metricCollectorPort;
+  private String collectors;
+  private String metricCollectorProtocol;
   private TimelineScheduledReporter reporter;
   private TimelineMetricsCache metricsCache;
   private int timeoutSeconds = 10;
+  private String zookeeperQuorum;
 
   private String[] excludedMetricsPrefixes;
   private String[] includedMetricsPrefixes;
@@ -86,8 +90,13 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private Set<String> excludedMetrics = new HashSet<>();
 
   @Override
-  protected String getCollectorUri() {
-    return collectorUri;
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(metricCollectorProtocol, host, metricCollectorPort);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return metricCollectorProtocol;
   }
 
   @Override
@@ -95,10 +104,26 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zookeeperQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
 
+  @Override
   public void init(VerifiableProperties props) {
     synchronized (lock) {
       if (!initialized) {
@@ -113,26 +138,33 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
           LOG.error("Could not identify hostname.");
           throw new RuntimeException("Could not identify hostname.", e);
         }
+        // Initialize the collector write strategy
+        super.init();
+
         KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
         timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS);
         int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS);
         int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT);
+
+        zookeeperQuorum = props.getString("zookeeper.connect");
+        collectors = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
+        metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
+
         String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
-        String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
-        String metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
+        metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
+
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
 
-        collectorUri = metricCollectorProtocol + "://" + metricCollectorHost +
-                       ":" + metricCollectorPort + WS_V1_TIMELINE_METRICS;
+        collectorUri = constructTimelineMetricUri(metricCollectorProtocol,
+          metricCollectorHost, metricCollectorPort);
 
-        if (collectorUri.toLowerCase().startsWith("https://")) {
+        if (metricCollectorProtocol.contains("https")) {
           String trustStorePath = props.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
           String trustStoreType = props.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
           String trustStorePwd = props.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
           loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
         }
 
-
         // Exclusion policy
         String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, "");
         if (!StringUtils.isEmpty(excludedMetricsStr.trim())) {

+ 1 - 0
ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java

@@ -76,6 +76,7 @@ public class KafkaTimelineMetricsReporterTest {
     list.add(meter);
     list.add(timer);
     Properties properties = new Properties();
+    properties.setProperty("zookeeper.connect", "localhost:2181");
     properties.setProperty("kafka.timeline.metrics.sendInterval", "5900");
     properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000");
     properties.setProperty("kafka.timeline.metrics.host", "localhost");

+ 37 - 7
ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -47,21 +47,45 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
+  private String port;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
 
   public StormTimelineMetricsReporter() {
 
   }
 
   @Override
-  protected String getCollectorUri() {
+  protected String getCollectorUri(String host) {
     return this.collectorUri;
   }
 
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
   @Override
   protected int getTimeoutSeconds() {
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
   @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
@@ -80,18 +104,24 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
       Map stormConf = Utils.readStormConfig();
       this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
-      String collector = cf.get(COLLECTOR_PROPERTY).toString();
-      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
-        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
-        DEFAULT_POST_TIMEOUT_SECONDS;
-      applicationId = cf.get(APP_ID).toString();
-      collectorUri = collector + WS_V1_TIMELINE_METRICS;
+
+      collectors = cf.get(COLLECTOR_PROPERTY).toString();
+      protocol = cf.get(COLLECTOR_PROTOCOL) != null ? cf.get(COLLECTOR_PROTOCOL).toString() : "http";
+      port = cf.get(COLLECTOR_PORT) != null ? cf.get(COLLECTOR_PORT).toString() : "6188";
+      zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null;
+
       if (collectorUri.toLowerCase().startsWith("https://")) {
         String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
         String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
         String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
         loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
       }
+
+      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+        DEFAULT_POST_TIMEOUT_SECONDS;
+      applicationId = cf.get(APP_ID).toString();
+
     } catch (Exception e) {
       LOG.warn("Could not initialize metrics collector, please specify " +
         "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);

+ 35 - 3
ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -55,17 +55,41 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private int timeoutSeconds;
   private String topologyName;
   private String applicationId;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
+  private String port;
 
   @Override
-  protected String getCollectorUri() {
+  protected String getCollectorUri(String host) {
     return collectorUri;
   }
 
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
   @Override
   protected int getTimeoutSeconds() {
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
   @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
@@ -88,8 +112,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
         String.valueOf(MAX_EVICTION_TIME_MILLIS)));
     applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
-    collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS;
-    if (collectorUri.toLowerCase().startsWith("https://")) {
+
+    collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+    zkQuorum = configuration.getProperty("zookeeper.quorum");
+    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+    port = configuration.getProperty(COLLECTOR_PORT, "6188");
+
+    // Initialize the collector write strategy
+    super.init();
+
+    if (protocol.toLowerCase().startsWith("https://")) {
       String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();

+ 47 - 12
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -46,14 +46,23 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private String collectorUri;
   private String applicationId;
   private int timeoutSeconds;
+  private String port;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
 
   public StormTimelineMetricsReporter() {
 
   }
 
   @Override
-  protected String getCollectorUri() {
-    return this.collectorUri;
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
   }
 
   @Override
@@ -61,6 +70,22 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
+
   @Override
   public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
@@ -75,24 +100,34 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
         LOG.error("Could not identify hostname.");
         throw new RuntimeException("Could not identify hostname.", e);
       }
-      Configuration configuration = new Configuration("/storm-metrics2.properties");
-      String collector = configuration.getProperty(COLLECTOR_PROPERTY).toString();
-      timeoutSeconds = configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ?
-        Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+      Configuration conf = new Configuration("/storm-metrics2.properties");
+
+      collectors = conf.getProperty(COLLECTOR_PROPERTY);
+      protocol = conf.getProperty(COLLECTOR_PROTOCOL) != null ? conf.getProperty(COLLECTOR_PROTOCOL) : "http";
+      port = conf.getProperty(COLLECTOR_PORT) != null ? conf.getProperty(COLLECTOR_PORT) : "6188";
+      zkQuorum = conf.getProperty(ZOOKEEPER_QUORUM) != null ? conf.getProperty(ZOOKEEPER_QUORUM) : null;
+
+      timeoutSeconds = conf.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        Integer.parseInt(conf.getProperty(METRICS_POST_TIMEOUT_SECONDS).toString()) :
         DEFAULT_POST_TIMEOUT_SECONDS;
-      applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
-      collectorUri = collector + WS_V1_TIMELINE_METRICS;
+      applicationId = conf.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
+
+      collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
+
       if (collectorUri.toLowerCase().startsWith("https://")) {
-        String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
-        String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
-        String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
+        String trustStorePath = conf.getProperty(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
+        String trustStoreType = conf.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
+        String trustStorePwd = conf.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
         loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
       }
+
+
     } catch (Exception e) {
       LOG.warn("Could not initialize metrics collector, please specify " +
         "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);
     }
-
+    // Initialize the collector write strategy
+    super.init();
   }
 
   @Override

+ 35 - 3
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -63,17 +63,41 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private int timeoutSeconds;
   private String topologyName;
   private String applicationId;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
+  private String port;
 
   @Override
-  protected String getCollectorUri() {
+  protected String getCollectorUri(String host) {
     return collectorUri;
   }
 
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
   @Override
   protected int getTimeoutSeconds() {
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
   @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
@@ -96,8 +120,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
         String.valueOf(MAX_EVICTION_TIME_MILLIS)));
     applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
-    collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS;
-    if (collectorUri.toLowerCase().startsWith("https://")) {
+
+    collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+    zkQuorum = configuration.getProperty("zookeeper.quorum");
+    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+    port = configuration.getProperty(COLLECTOR_PORT, "6188");
+
+    // Initialize the collector write strategy
+    super.init();
+
+    if (protocol.toLowerCase().startsWith("https://")) {
       String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();

+ 12 - 17
ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java

@@ -18,6 +18,18 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.junit.Ignore;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
 import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_TASK_ID;
 import static org.easymock.EasyMock.anyObject;
@@ -27,23 +39,6 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.storm.Constants;
-import org.apache.storm.shade.com.google.common.collect.Lists;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.storm.metric.api.IMetricsConsumer;
-
 public class StormTimelineMetricsSinkTest {
   @Test
   public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {

+ 17 - 5
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java

@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
@@ -45,7 +45,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
+
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -56,8 +58,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
@@ -75,7 +78,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
   private final Map<AggregationTaskRunner.AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>();
   private TimelineMetricMetadataManager metricMetadataManager;
   private Integer defaultTopNHostsLimit;
-  private TimelineMetricHAController haController;
+  private MetricCollectorHAController haController;
 
   /**
    * Construct the service.
@@ -106,7 +109,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       // Start HA service
       if (configuration.isDistributedOperationModeEnabled()) {
         // Start the controller
-        haController = new TimelineMetricHAController(configuration);
+        haController = new MetricCollectorHAController(configuration);
         try {
           haController.initializeHAController();
         } catch (Exception e) {
@@ -384,7 +387,16 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
 
   @Override
   public List<String> getLiveInstances() {
-    return haController.getLiveInstanceHostNames();
+    List<String> instances = haController.getLiveInstanceHostNames();
+    if (instances == null || instances.isEmpty()) {
+      try {
+        // Always return current host as live (embedded operation mode)
+        instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+      } catch (UnknownHostException e) {
+        LOG.debug("Exception on getting hostname from env.", e);
+      }
+    }
+    return instances;
   }
 
   private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator,

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java

@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
@@ -92,7 +92,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
                                     String tableName,
                                     String outputTableName,
                                     Long nativeTimeRangeDelay,
-                                    TimelineMetricHAController haController) {
+                                    MetricCollectorHAController haController) {
     this(aggregatorName, hBaseAccessor, metricsConf);
     this.checkpointLocation = checkpointLocation;
     this.sleepIntervalMillis = sleepIntervalMillis;

+ 8 - 8
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -95,7 +95,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+     MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -145,7 +145,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+     MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -195,7 +195,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+     MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -247,7 +247,7 @@ public class TimelineMetricAggregatorFactory {
   public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
     TimelineMetricMetadataManager metadataManager,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -291,7 +291,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -344,7 +344,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -397,7 +397,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
@@ -48,7 +48,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
                                          String inputTableName,
                                          String outputTableName,
                                          Long nativeTimeRangeDelay,
-                                         TimelineMetricHAController haController) {
+                                         MetricCollectorHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java

@@ -26,7 +26,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -76,7 +76,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
                                                String outputTableName,
                                                Long nativeTimeRangeDelay,
                                                Long timeSliceInterval,
-                                               TimelineMetricHAController haController) {
+                                               MetricCollectorHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
       tableName, outputTableName, nativeTimeRangeDelay, haController);

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java

@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
@@ -49,7 +49,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
                                       String tableName,
                                       String outputTableName,
                                       Long nativeTimeRangeDelay,
-                                      TimelineMetricHAController haController) {
+                                      MetricCollectorHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
       tableName, outputTableName, nativeTimeRangeDelay, haController);

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java

@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 
@@ -46,7 +46,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
                                          String inputTableName,
                                          String outputTableName,
                                          Long nativeTimeRangeDelay,
-                                         TimelineMetricHAController haController) {
+                                         MetricCollectorHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,

+ 2 - 2
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java

@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 
@@ -44,7 +44,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
                                       String tableName,
                                       String outputTableName,
                                       Long nativeTimeRangeDelay,
-                                      TimelineMetricHAController haController) {
+                                      MetricCollectorHAController haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam,
       tableName, outputTableName, nativeTimeRangeDelay, haController);

+ 3 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java

@@ -38,9 +38,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME;
 
 public class AggregationTaskRunner {
   private final String instanceName;

+ 3 - 3
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java → ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java

@@ -45,8 +45,8 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
 
-public class TimelineMetricHAController {
-  private static final Log LOG = LogFactory.getLog(TimelineMetricHAController.class);
+public class MetricCollectorHAController {
+  private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class);
 
   static final String CLUSTER_NAME = "ambari-metrics-cluster";
   static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
@@ -68,7 +68,7 @@ public class TimelineMetricHAController {
 
   private volatile boolean isInitialized = false;
 
-  public TimelineMetricHAController(TimelineMetricConfiguration configuration) {
+  public MetricCollectorHAController(TimelineMetricConfiguration configuration) {
     String instancePort;
     try {
       instanceHostname = configuration.getInstanceHostnameFromEnv();

+ 8 - 0
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java

@@ -411,6 +411,14 @@ public class TimelineWebServices {
     }
   }
 
+  /**
+   * This is a discovery endpoint that advertises known live collector
+   * instances. Note: It will always answer with current instance as live.
+   * This can be utilized as a liveliness pinger endpoint since the instance
+   * names are cached and thereby no synchronous calls result from this API
+   *
+   * @return List<String> hostnames</String>
+   */
   @GET
   @Path("/metrics/livenodes")
   @Produces({ MediaType.APPLICATION_JSON })

+ 6 - 6
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java → ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java

@@ -31,14 +31,14 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 
-public class TimelineMetricHAControllerTest extends AbstractMiniHBaseClusterTest {
+public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTest {
   TimelineMetricConfiguration configuration;
 
   @Before
@@ -58,9 +58,9 @@ public class TimelineMetricHAControllerTest extends AbstractMiniHBaseClusterTest
     replay(configuration);
   }
 
-  @Test(timeout = 150000)
+  @Test(timeout = 180000)
   public void testHAControllerDistributedAggregation() throws Exception {
-    TimelineMetricHAController haController = new TimelineMetricHAController(configuration);
+    MetricCollectorHAController haController = new MetricCollectorHAController(configuration);
     haController.initializeHAController();
     // Wait for task assignment
     Thread.sleep(10000);

+ 20 - 0
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py

@@ -86,6 +86,7 @@ if config.has_key('hostname'):
 
 ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
+metric_collector_port = None
 if has_metric_collector:
   if 'cluster-env' in config['configurations'] and \
       'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -111,3 +112,22 @@ if has_metric_collector:
   pass
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+
+#Collector hosts
+metric_collector_hosts = None
+if ams_collector_hosts:
+  for host in ams_collector_hosts:
+    metric_collector_hosts += host + ':' + metric_collector_port + ','
+  metric_collector_hosts = metric_collector_hosts[:-1]
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = None
+if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0:
+  if 'zoo.cfg' in config['configurations'] and 'clientPort' in config['configurations']['zoo.cfg']:
+    zookeeper_clientPort = config['configurations']['zoo.cfg']['clientPort']
+  else:
+    zookeeper_clientPort = '2181'
+  zookeeper_quorum = (':' + zookeeper_clientPort + ',').join(config['clusterHostInfo']['zookeeper_hosts'])
+  # last port config
+  zookeeper_quorum += ':' + zookeeper_clientPort
+

+ 4 - 1
ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2

@@ -16,7 +16,10 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+collector={{metric_collector_hosts}}
+protocol={{metric_collector_protocol}}
+zookeeper.quorum={{zookeeper_quorum}}
+port={{metric_collector_port}}
 collectionFrequency={{metrics_collection_period}}000
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000

+ 3 - 1
ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2

@@ -51,6 +51,7 @@ hbase.extendedperiod = 3600
 
 *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
 *.sink.timeline.slave.host.name={{hostname}}
+
 hbase.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.period={{metrics_collection_period}}
 hbase.collector={{metric_collector_host}}:{{metric_collector_port}}
@@ -66,7 +67,8 @@ rpc.collector={{metric_collector_host}}:{{metric_collector_port}}
 hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.sink.timeline.period={{metrics_collection_period}}
 hbase.sink.timeline.sendInterval={{metrics_report_interval}}000
-hbase.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+hbase.sink.timeline.collector={{metric_collector_hosts}}
+hbase.sink.timeline.protocol={{metric_collector_protocol}}
 
 # HTTPS properties
 hbase.sink.timeline.truststore.path = {{metric_truststore_path}}

+ 15 - 0
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py

@@ -170,6 +170,7 @@ if stack_supports_storm_kerberos:
 
 ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
+metric_collector_port = None
 if has_metric_collector:
   if 'cluster-env' in config['configurations'] and \
       'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -201,6 +202,20 @@ metrics_collection_period = default("/configurations/ams-site/timeline.metrics.s
 metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
 metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
 
+# Collector hosts
+metric_collector_hosts = None
+if ams_collector_hosts:
+  for host in ams_collector_hosts:
+    metric_collector_hosts += host + ':' + metric_collector_port + ','
+  metric_collector_hosts = metric_collector_hosts[:-1]
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = None
+if storm_zookeeper_servers:
+  for server in storm_zookeeper_servers:
+    zookeeper_quorum += server + ':' + storm_zookeeper_port + ","
+  zookeeper_quorum = zookeeper_quorum[:-1]
+
 jar_jvm_opts = ''
 
 ########################################################

+ 4 - 1
ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2

@@ -16,7 +16,10 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+collector={{metric_collector_hosts}}
+protocol={{metric_collector_protocol}}
+port={{metric_collector_port}}
+zookeeper.quorum={{zookeeper_quorum}}
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000
 clusterReporterAppId=nimbus

+ 21 - 0
ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py

@@ -109,8 +109,11 @@ is_rmnode_master = hostname in rm_host
 is_hsnode_master = hostname in hs_host
 is_hbase_master = hostname in hbase_master_hosts
 is_slave = hostname in slave_hosts
+
 if has_ganglia_server:
   ganglia_server_host = ganglia_server_hosts[0]
+
+metric_collector_port = None
 if has_metric_collector:
   if 'cluster-env' in config['configurations'] and \
       'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -138,6 +141,24 @@ if has_metric_collector:
 metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+#Collector hosts
+metric_collector_hosts = None
+if ams_collector_hosts:
+  for host in ams_collector_hosts:
+    metric_collector_hosts += host + ':' + metric_collector_port + ','
+  metric_collector_hosts = metric_collector_hosts[:-1]
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = None
+if has_zk_host:
+  if 'zoo.cfg' in config['configurations'] and 'clientPort' in config['configurations']['zoo.cfg']:
+    zookeeper_clientPort = config['configurations']['zoo.cfg']['clientPort']
+  else:
+    zookeeper_clientPort = '2181'
+  zookeeper_quorum = (':' + zookeeper_clientPort + ',').join(config['clusterHostInfo']['zookeeper_hosts'])
+  # last port config
+  zookeeper_quorum += ':' + zookeeper_clientPort
+
 #hadoop params
 
 if has_namenode or dfs_type == 'HCFS':

+ 10 - 7
ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2

@@ -72,19 +72,22 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.period={{metrics_collection_period}}
 *.sink.timeline.sendInterval={{metrics_report_interval}}000
 *.sink.timeline.slave.host.name = {{hostname}}
+*.sink.timeline.zookeeper.quorum={{zookeeper_quorum}}
+*.sink.timeline.protocol={{metric_collector_protocol}}
+*.sink.timeline.port={{metric_collector_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}
 *.sink.timeline.truststore.type = {{metric_truststore_type}}
 *.sink.timeline.truststore.password = {{metric_truststore_password}}
 
-datanode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-namenode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-resourcemanager.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-nodemanager.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-jobhistoryserver.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-journalnode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-applicationhistoryserver.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+datanode.sink.timeline.collector={{metrics_collector_hosts}}
+namenode.sink.timeline.collector={{metrics_collector_hosts}}
+resourcemanager.sink.timeline.collector={{metrics_collector_hosts}}
+nodemanager.sink.timeline.collector={{metrics_collector_hosts}}
+jobhistoryserver.sink.timeline.collector={{metrics_collector_hosts}}
+journalnode.sink.timeline.collector={{metrics_collector_hosts}}
+applicationhistoryserver.sink.timeline.collector={{metrics_collector_hosts}}
 
 resourcemanager.sink.timeline.tagsForPrefix.yarn=Queue