|
@@ -61,7 +61,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
|
|
|
public static final String METRICS_SEND_INTERVAL = "sendInterval";
|
|
|
public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout";
|
|
|
- public static final String COLLECTOR_PROPERTY = "collector";
|
|
|
+ public static final String COLLECTOR_HOSTS_PROPERTY = "collector.hosts";
|
|
|
public static final String COLLECTOR_PROTOCOL = "protocol";
|
|
|
public static final String COLLECTOR_PORT = "port";
|
|
|
public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
|
|
@@ -211,6 +211,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
|
|
|
String connectUrl = getCollectorUri(collectorHost);
|
|
|
String jsonData = null;
|
|
|
+ LOG.debug("EmitMetrics connectUrl = " + connectUrl);
|
|
|
try {
|
|
|
jsonData = mapper.writeValueAsString(metrics);
|
|
|
} catch (IOException e) {
|
|
@@ -326,22 +327,16 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- String collectorHosts = getConfiguredCollectors();
|
|
|
+ Collection<String> collectorHosts = getConfiguredCollectorHosts();
|
|
|
+
|
|
|
+ LOG.debug("Trying to find live collector host from : " + collectorHosts);
|
|
|
// Reach out to all configured collectors before Zookeeper
|
|
|
if (collectorHosts != null && !collectorHosts.isEmpty()) {
|
|
|
- String[] hosts = collectorHosts.split(",");
|
|
|
- for (String hostPortStr : hosts) {
|
|
|
- if (hostPortStr != null && !hostPortStr.isEmpty()) {
|
|
|
- String[] hostPortPair = hostPortStr.split(":");
|
|
|
- if (hostPortPair.length < 2) {
|
|
|
- LOG.warn("Collector port is missing from the configuration.");
|
|
|
- continue;
|
|
|
- }
|
|
|
- String hostStr = hostPortPair[0].trim();
|
|
|
- String portStr = hostPortPair[1].trim();
|
|
|
- // Check liveliness and get known instances
|
|
|
+ for (String hostStr : collectorHosts) {
|
|
|
+ hostStr = hostStr.trim();
|
|
|
+ if (!hostStr.isEmpty()) {
|
|
|
try {
|
|
|
- Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, portStr);
|
|
|
+ Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort());
|
|
|
// Update live Hosts - current host will already be a part of this
|
|
|
for (String host : liveHosts) {
|
|
|
allKnownLiveCollectors.add(host);
|
|
@@ -377,6 +372,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
|
|
|
return targetCollectorHostSupplier.get();
|
|
|
}
|
|
|
+ LOG.warn("Couldn't find any live collectors. Returning null");
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -455,6 +451,31 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
sb.append(WS_V1_TIMELINE_METRICS);
|
|
|
return sb.toString();
|
|
|
}
|
|
|
+ /**
|
|
|
+ * Parses input Sting of format "['host1', 'host2']" into Collection of hostnames
|
|
|
+ */
|
|
|
+ protected Collection<String> parseHostsStringIntoCollection(String hostsString) {
|
|
|
+ Set<String> hosts = new HashSet<>();
|
|
|
+
|
|
|
+ if (hostsString == null) {
|
|
|
+ LOG.error("No Metric collector configured.");
|
|
|
+ return hosts;
|
|
|
+ }
|
|
|
+
|
|
|
+ hostsString = hostsString.replace("[", "");
|
|
|
+ hostsString = hostsString.replace("]", "");
|
|
|
+ hostsString = hostsString.replace("'", "");
|
|
|
+
|
|
|
+ String [] hostNamesWithApostrophes = hostsString.split(",");
|
|
|
+
|
|
|
+ for (String host : hostNamesWithApostrophes) {
|
|
|
+ host = host.trim();
|
|
|
+ if (host.equals("")) continue;
|
|
|
+ hosts.add(host);
|
|
|
+ }
|
|
|
+
|
|
|
+ return hosts;
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get a pre-formatted URI for the collector
|
|
@@ -463,6 +484,8 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
|
|
|
abstract protected String getCollectorProtocol();
|
|
|
|
|
|
+ abstract protected String getCollectorPort();
|
|
|
+
|
|
|
/**
|
|
|
* How soon to timeout on the emit calls in seconds.
|
|
|
*/
|
|
@@ -475,10 +498,10 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
abstract protected String getZookeeperQuorum();
|
|
|
|
|
|
/**
|
|
|
- * Get pre-configured list of collectors available
|
|
|
- * @return String "host1:port,host2:port"
|
|
|
+ * Get pre-configured list of collectors hosts available
|
|
|
+ * @return Collection<String> host1,host2
|
|
|
*/
|
|
|
- abstract protected String getConfiguredCollectors();
|
|
|
+ abstract protected Collection<String> getConfiguredCollectorHosts();
|
|
|
|
|
|
/**
|
|
|
* Get hostname used for calculating write shard.
|