|
@@ -22,6 +22,8 @@ import logging
|
|
|
import threading
|
|
|
|
|
|
from security import CachedHTTPSConnection, CachedHTTPConnection
|
|
|
+from blacklisted_set import BlacklistedSet
|
|
|
+from config_reader import ROUND_ROBIN_FAILOVER_STRATEGY
|
|
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
@@ -37,19 +39,22 @@ class Emitter(threading.Thread):
|
|
|
logger.debug('Initializing Emitter thread.')
|
|
|
self.lock = threading.Lock()
|
|
|
self.send_interval = config.get_send_interval()
|
|
|
+ self.hostname = config.get_hostname_config()
|
|
|
+ self.hostname_hash = self.compute_hash(self.hostname)
|
|
|
self._stop_handler = stop_handler
|
|
|
self.application_metric_map = application_metric_map
|
|
|
- # TODO verify certificate
|
|
|
- timeout = int(self.send_interval - 10)
|
|
|
- if config.is_server_https_enabled():
|
|
|
- self.connection = CachedHTTPSConnection(config.get_server_host(),
|
|
|
- config.get_server_port(),
|
|
|
- timeout=timeout,
|
|
|
- ca_certs=config.get_ca_certs())
|
|
|
+ self.collector_port = config.get_server_port()
|
|
|
+ self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
|
|
|
+ self.is_server_https_enabled = config.is_server_https_enabled()
|
|
|
+
|
|
|
+ if self.is_server_https_enabled:
|
|
|
+ self.ca_certs = config.get_ca_certs()
|
|
|
+
|
|
|
+ # TimedRoundRobinSet
|
|
|
+ if config.get_failover_strategy() == ROUND_ROBIN_FAILOVER_STRATEGY:
|
|
|
+ self.active_collector_hosts = BlacklistedSet(self.all_metrics_collector_hosts, float(config.get_failover_strategy_blacklisted_interval_seconds()))
|
|
|
else:
|
|
|
- self.connection = CachedHTTPConnection(config.get_server_host(),
|
|
|
- config.get_server_port(),
|
|
|
- timeout=timeout)
|
|
|
+ raise Exception(-1, "Uknown failover strategy {0}".format(config.get_failover_strategy()))
|
|
|
|
|
|
def run(self):
|
|
|
logger.info('Running Emitter thread: %s' % threading.currentThread().getName())
|
|
@@ -66,7 +71,6 @@ class Emitter(threading.Thread):
|
|
|
pass
|
|
|
|
|
|
def submit_metrics(self):
|
|
|
- retry_count = 0
|
|
|
# This call will acquire lock on the map and clear contents before returning
|
|
|
# After configured number of retries the data will not be sent to the
|
|
|
# collector
|
|
@@ -75,36 +79,85 @@ class Emitter(threading.Thread):
|
|
|
logger.info("Nothing to emit, resume waiting.")
|
|
|
return
|
|
|
pass
|
|
|
+ self.push_metrics(json_data)
|
|
|
|
|
|
- response = None
|
|
|
- while retry_count < self.MAX_RETRY_COUNT:
|
|
|
- try:
|
|
|
- response = self.push_metrics(json_data)
|
|
|
- except Exception, e:
|
|
|
- logger.warn('Error sending metrics to server. %s' % str(e))
|
|
|
+ def push_metrics(self, data):
|
|
|
+ success = False
|
|
|
+ while self.active_collector_hosts.get_actual_size() > 0:
|
|
|
+ collector_host = self.get_collector_host_shard()
|
|
|
+ success = self.try_with_collector_host(collector_host, data)
|
|
|
+ if success:
|
|
|
+ break
|
|
|
+ pass
|
|
|
+
|
|
|
+ if not success:
|
|
|
+ logger.info('No valid collectors found...')
|
|
|
+ for collector_host in self.active_collector_hosts:
|
|
|
+ success = self.try_with_collector_host(collector_host, data)
|
|
|
pass
|
|
|
|
|
|
+ def try_with_collector_host(self, collector_host, data):
|
|
|
+ headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
|
|
|
+ connection = self.get_connection(collector_host)
|
|
|
+ logger.debug("message to send: %s" % data)
|
|
|
+ retry_count = 0
|
|
|
+ while retry_count < self.MAX_RETRY_COUNT:
|
|
|
+ response = self.get_response_from_submission(connection, data, headers)
|
|
|
if response and response.status == 200:
|
|
|
- retry_count = self.MAX_RETRY_COUNT
|
|
|
+ return True
|
|
|
else:
|
|
|
logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
|
|
|
retry_count += 1
|
|
|
#Wait for the service stop event instead of sleeping blindly
|
|
|
if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
|
|
|
- return
|
|
|
- pass
|
|
|
+ return True
|
|
|
pass
|
|
|
- # TODO verify certificate
|
|
|
- def push_metrics(self, data):
|
|
|
- headers = {"Content-Type" : "application/json",
|
|
|
- "Accept" : "*/*",
|
|
|
- "Connection":" Keep-Alive"}
|
|
|
- logger.debug("message to sent: %s" % data)
|
|
|
- self.connection.request("POST", self.AMS_METRICS_POST_URL, data, headers)
|
|
|
- response = self.connection.getresponse()
|
|
|
- if response:
|
|
|
- logger.debug("POST response from server: retcode = {0}, reason = {1}"
|
|
|
- .format(response.status, response.reason))
|
|
|
- logger.debug(str(response.read()))
|
|
|
-
|
|
|
- return response
|
|
|
+
|
|
|
+ if retry_count >= self.MAX_RETRY_COUNT:
|
|
|
+ self.active_collector_hosts.blacklist(collector_host)
|
|
|
+ logger.warn("Metric collector host {0} was blacklisted.".format(collector_host))
|
|
|
+ return False
|
|
|
+
|
|
|
+ def get_connection(self, collector_host):
|
|
|
+ timeout = int(self.send_interval - 10)
|
|
|
+ if self.is_server_https_enabled:
|
|
|
+ connection = CachedHTTPSConnection(collector_host,
|
|
|
+ self.collector_port,
|
|
|
+ timeout=timeout,
|
|
|
+ ca_certs=self.ca_certs)
|
|
|
+ else:
|
|
|
+ connection = CachedHTTPConnection(collector_host,
|
|
|
+ self.collector_port,
|
|
|
+ timeout=timeout)
|
|
|
+ return connection
|
|
|
+
|
|
|
+ def get_response_from_submission(self, connection, data, headers):
|
|
|
+ try:
|
|
|
+ connection.request("POST", self.AMS_METRICS_POST_URL, data, headers)
|
|
|
+ response = connection.getresponse()
|
|
|
+ if response:
|
|
|
+ logger.debug("POST response from server: retcode = {0}, reason = {1}"
|
|
|
+ .format(response.status, response.reason))
|
|
|
+ logger.debug(str(response.read()))
|
|
|
+ return response
|
|
|
+ except Exception, e:
|
|
|
+ logger.warn('Error sending metrics to server. %s' % str(e))
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_collector_host_shard(self):
|
|
|
+ size = self.active_collector_hosts.get_actual_size()
|
|
|
+ index = self.hostname_hash % size
|
|
|
+ index = index if index >= 0 else index + size
|
|
|
+ hostname = self.active_collector_hosts.get_item_at_index(index)
|
|
|
+ logger.info('Calculated collector shard based on hostname : %s' % hostname)
|
|
|
+ return hostname
|
|
|
+
|
|
|
+ def compute_hash(self, hostname):
|
|
|
+ hash = 11987
|
|
|
+ length = len(hostname)
|
|
|
+ for i in xrange(0, length - 1):
|
|
|
+ hash = 31*hash + ord(hostname[i])
|
|
|
+ return hash
|
|
|
+
|
|
|
+
|
|
|
+
|