|
@@ -51,8 +51,12 @@ import java.net.URL;
|
|
|
import java.security.KeyStore;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Random;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.SortedSet;
|
|
|
+import java.util.TreeSet;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
@@ -76,8 +80,9 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
|
|
|
protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
|
|
|
public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
|
|
|
- public int ZK_CONNECT_TRY_TIME = 10000;
|
|
|
+ public int ZK_CONNECT_TRY_COUNT = 10;
|
|
|
public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000;
|
|
|
+ public boolean shardExpired = true;
|
|
|
|
|
|
private SSLSocketFactory sslSocketFactory;
|
|
|
|
|
@@ -93,7 +98,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
// well as timed refresh
|
|
|
protected Supplier<String> targetCollectorHostSupplier;
|
|
|
|
|
|
- protected final List<String> allKnownLiveCollectors = new ArrayList<>();
|
|
|
+ protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>();
|
|
|
|
|
|
private volatile boolean isInitializedForHA = false;
|
|
|
|
|
@@ -125,7 +130,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
protected void init() {
|
|
|
metricSinkWriteShardStrategy = new MetricSinkWriteShardHostnameHashingStrategy(getHostname());
|
|
|
collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(),
|
|
|
- ZK_CONNECT_TRY_TIME, ZK_SLEEP_BETWEEN_RETRY_TIME);
|
|
|
+ ZK_CONNECT_TRY_COUNT, ZK_SLEEP_BETWEEN_RETRY_TIME);
|
|
|
isInitializedForHA = true;
|
|
|
}
|
|
|
|
|
@@ -202,6 +207,8 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
collectorHost = targetCollectorHostSupplier.get();
|
|
|
// Last X attempts have failed - force refresh
|
|
|
if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) {
|
|
|
+ LOG.info("Removing collector " + collectorHost + " from allKnownLiveCollectors.");
|
|
|
+ allKnownLiveCollectors.remove(collectorHost);
|
|
|
targetCollectorHostSupplier = null;
|
|
|
collectorHost = findPreferredCollectHost();
|
|
|
}
|
|
@@ -319,6 +326,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
init();
|
|
|
}
|
|
|
|
|
|
+ shardExpired = false;
|
|
|
// Auto expire and re-calculate after 1 hour
|
|
|
if (targetCollectorHostSupplier != null) {
|
|
|
String targetCollector = targetCollectorHostSupplier.get();
|
|
@@ -327,32 +335,12 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Collection<String> collectorHosts = getConfiguredCollectorHosts();
|
|
|
-
|
|
|
- LOG.debug("Trying to find live collector host from : " + collectorHosts);
|
|
|
// Reach out to all configured collectors before Zookeeper
|
|
|
- if (collectorHosts != null && !collectorHosts.isEmpty()) {
|
|
|
- for (String hostStr : collectorHosts) {
|
|
|
- hostStr = hostStr.trim();
|
|
|
- if (!hostStr.isEmpty()) {
|
|
|
- try {
|
|
|
- Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort());
|
|
|
- // Update live Hosts - current host will already be a part of this
|
|
|
- for (String host : liveHosts) {
|
|
|
- allKnownLiveCollectors.add(host);
|
|
|
- }
|
|
|
- break; // Found at least 1 live collector
|
|
|
- } catch (MetricCollectorUnavailableException e) {
|
|
|
- LOG.info("Collector " + hostStr + " is not longer live. Removing " +
|
|
|
- "it from list of know live collector hosts : " + allKnownLiveCollectors);
|
|
|
- allKnownLiveCollectors.remove(hostStr);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ refreshCollectorsFromConfigured();
|
|
|
|
|
|
// Lookup Zookeeper for live hosts - max 10 seconds wait time
|
|
|
if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null) {
|
|
|
+ LOG.info("No live collectors from configuration. Requesting zookeeper...");
|
|
|
allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
|
|
|
}
|
|
|
|
|
@@ -361,6 +349,13 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
new Supplier<String>() {
|
|
|
@Override
|
|
|
public String get() {
|
|
|
+ //shardExpired flag is used to determine if the Supplier.get() is invoked through the
|
|
|
+ // findPreferredCollectHost method (No need to refresh collector hosts
|
|
|
+ // OR
|
|
|
+ // through Expiry (Refresh needed to pick up dead collectors that might have not become alive).
|
|
|
+ if (shardExpired) {
|
|
|
+ refreshCollectorsFromConfigured();
|
|
|
+ }
|
|
|
return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors));
|
|
|
}
|
|
|
}, // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
|
|
@@ -370,12 +365,40 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
TimeUnit.MINUTES
|
|
|
);
|
|
|
|
|
|
- return targetCollectorHostSupplier.get();
|
|
|
+ String collectorHost = targetCollectorHostSupplier.get();
|
|
|
+ shardExpired = true;
|
|
|
+ return collectorHost;
|
|
|
}
|
|
|
LOG.warn("Couldn't find any live collectors. Returning null");
|
|
|
+ shardExpired = true;
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ private void refreshCollectorsFromConfigured() {
|
|
|
+ Collection<String> collectorHosts = getConfiguredCollectorHosts();
|
|
|
+
|
|
|
+ LOG.debug("Trying to find live collector host from : " + collectorHosts);
|
|
|
+ if (collectorHosts != null && !collectorHosts.isEmpty()) {
|
|
|
+ for (String hostStr : collectorHosts) {
|
|
|
+ hostStr = hostStr.trim();
|
|
|
+ if (!hostStr.isEmpty()) {
|
|
|
+ try {
|
|
|
+ Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort());
|
|
|
+ // Update live Hosts - current host will already be a part of this
|
|
|
+ for (String host : liveHosts) {
|
|
|
+ allKnownLiveCollectors.add(host);
|
|
|
+ }
|
|
|
+ break; // Found at least 1 live collector
|
|
|
+ } catch (MetricCollectorUnavailableException e) {
|
|
|
+ LOG.info("Collector " + hostStr + " is not longer live. Removing " +
|
|
|
+ "it from list of know live collector hosts : " + allKnownLiveCollectors);
|
|
|
+ allKnownLiveCollectors.remove(hostStr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException {
|
|
|
List<String> collectors = new ArrayList<>();
|
|
|
HttpURLConnection connection = null;
|
|
@@ -424,7 +447,7 @@ public abstract class AbstractTimelineMetricsSink {
|
|
|
LOG.debug(errorMessage);
|
|
|
LOG.debug(ioe);
|
|
|
String warnMsg = "Unable to connect to collector to find live nodes.";
|
|
|
- LOG.warn(warnMsg, ioe);
|
|
|
+ LOG.warn(warnMsg);
|
|
|
throw new MetricCollectorUnavailableException(warnMsg);
|
|
|
}
|
|
|
return collectors;
|