|
@@ -84,6 +84,10 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
public int ZK_CONNECT_TRY_COUNT = 10;
|
|
|
public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000;
|
|
|
public boolean shardExpired = true;
|
|
|
+ private int zookeeperMinBackoffTimeMins = 2;
|
|
|
+ private int zookeeperMaxBackoffTimeMins = 5;
|
|
|
+ private long zookeeperBackoffTimeMillis;
|
|
|
+ private long lastFailedZkRequestTime = 0l;
|
|
|
|
|
|
private SSLSocketFactory sslSocketFactory;
|
|
|
|
|
@@ -132,6 +136,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
metricSinkWriteShardStrategy = new MetricSinkWriteShardHostnameHashingStrategy(getHostname());
|
|
|
collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(),
|
|
|
ZK_CONNECT_TRY_COUNT, ZK_SLEEP_BETWEEN_RETRY_TIME);
|
|
|
+ zookeeperBackoffTimeMillis = getZookeeperBackoffTimeMillis();
|
|
|
isInitializedForHA = true;
|
|
|
}
|
|
|
|
|
@@ -337,13 +342,27 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
}
|
|
|
|
|
|
// Reach out to all configured collectors before Zookeeper
|
|
|
- refreshCollectorsFromConfigured();
|
|
|
+ Collection<String> collectorHosts = getConfiguredCollectorHosts();
|
|
|
+ refreshCollectorsFromConfigured(collectorHosts);
|
|
|
|
|
|
// Lookup Zookeeper for live hosts - max 10 seconds wait time
|
|
|
- if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null) {
|
|
|
- //TODO : Bring back Zk fallback after proper curation.
|
|
|
- LOG.info("No live collectors from configuration. Not requesting zookeeper...");
|
|
|
- //allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
+ if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null
|
|
|
+ && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) {
|
|
|
+
|
|
|
+ LOG.info("No live collectors from configuration. Requesting zookeeper...");
|
|
|
+ allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
|
|
|
+ boolean noNewCollectorFromZk = true;
|
|
|
+ for (String collectorHostFromZk : allKnownLiveCollectors) {
|
|
|
+ if (!collectorHosts.contains(collectorHostFromZk)) {
|
|
|
+ noNewCollectorFromZk = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (noNewCollectorFromZk) {
|
|
|
+ LOG.info("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis");
|
|
|
+ lastFailedZkRequestTime = System.currentTimeMillis();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if (allKnownLiveCollectors.size() != 0) {
|
|
@@ -356,7 +375,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
// OR
|
|
|
// through Expiry (Refresh needed to pick up dead collectors that might have not become alive).
|
|
|
if (shardExpired) {
|
|
|
- refreshCollectorsFromConfigured();
|
|
|
+ refreshCollectorsFromConfigured(getConfiguredCollectorHosts());
|
|
|
}
|
|
|
return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors));
|
|
|
}
|
|
@@ -376,8 +395,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- private void refreshCollectorsFromConfigured() {
|
|
|
- Collection<String> collectorHosts = getConfiguredCollectorHosts();
|
|
|
+ private void refreshCollectorsFromConfigured(Collection<String> collectorHosts) {
|
|
|
|
|
|
LOG.debug("Trying to find live collector host from : " + collectorHosts);
|
|
|
if (collectorHosts != null && !collectorHosts.isEmpty()) {
|
|
@@ -497,6 +515,12 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
return hosts;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private long getZookeeperBackoffTimeMillis() {
|
|
|
+ return (zookeeperMinBackoffTimeMins +
|
|
|
+ rand.nextInt(zookeeperMaxBackoffTimeMins - zookeeperMinBackoffTimeMins + 1)) * 60*1000l;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get a pre-formatted URI for the collector
|
|
|
*/
|