|
@@ -29,6 +29,7 @@ import urllib2
|
|
|
import pprint
|
|
|
from random import randint
|
|
|
import subprocess
|
|
|
+import functools
|
|
|
|
|
|
import hostname
|
|
|
import security
|
|
@@ -103,6 +104,17 @@ class Controller(threading.Thread):
|
|
|
cluster_config_cache_dir = os.path.join(cache_dir, FileCache.CLUSTER_CONFIGURATION_CACHE_DIRECTORY)
|
|
|
recovery_cache_dir = os.path.join(cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
|
|
|
|
|
|
+ self.heartbeat_idle_interval_min = int(self.config.get('heartbeat', 'idle_interval_min')) if self.config.get('heartbeat', 'idle_interval_min') else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC
|
|
|
+ if self.heartbeat_idle_interval_min < 1:
|
|
|
+ self.heartbeat_idle_interval_min = 1
|
|
|
+
|
|
|
+ self.heartbeat_idle_interval_max = int(self.config.get('heartbeat', 'idle_interval_max')) if self.config.get('heartbeat', 'idle_interval_max') else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
|
|
|
+
|
|
|
+ if self.heartbeat_idle_interval_min > self.heartbeat_idle_interval_max:
|
|
|
+ raise Exception("Heartbeat minimum interval={0} seconds can not be greater than the maximum interval={1} seconds !".format(self.heartbeat_idle_interval_min, self.heartbeat_idle_interval_max))
|
|
|
+
|
|
|
+ self.get_heartbeat_interval = functools.partial(self.netutil.get_agent_heartbeat_idle_interval_sec, self.heartbeat_idle_interval_min, self.heartbeat_idle_interval_max)
|
|
|
+
|
|
|
self.recovery_manager = RecoveryManager(recovery_cache_dir)
|
|
|
|
|
|
self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir)
|
|
@@ -245,18 +257,38 @@ class Controller(threading.Thread):
|
|
|
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
|
|
|
retry = False
|
|
|
certVerifFailed = False
|
|
|
- hb_interval = self.config.get('heartbeat', 'state_interval')
|
|
|
+ state_interval = self.config.get('heartbeat', 'state_interval_seconds', '60')
|
|
|
+
|
|
|
+ # last time when state was successfully sent to server
|
|
|
+ last_state_timestamp = 0.0
|
|
|
+
|
|
|
+ # in order to be able to check from logs that heartbeats processing
|
|
|
+ # still running we log a message. However to avoid generating too
|
|
|
+ # much log when the heartbeat runs at a higher rate (e.g. 1 second intervals)
|
|
|
+ # we log the message at the same interval as 'state interval'
|
|
|
+ heartbeat_running_msg_timestamp = 0.0
|
|
|
|
|
|
while not self.DEBUG_STOP_HEARTBEATING:
|
|
|
+ heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
|
|
|
+
|
|
|
try:
|
|
|
+ crt_time = time.time()
|
|
|
+ if crt_time - heartbeat_running_msg_timestamp > int(state_interval):
|
|
|
+ logger.info("Heartbeat with server is running...")
|
|
|
+ heartbeat_running_msg_timestamp = crt_time
|
|
|
+
|
|
|
+ send_state = False
|
|
|
if not retry:
|
|
|
+ if crt_time - last_state_timestamp > int(state_interval):
|
|
|
+ send_state = True
|
|
|
+
|
|
|
data = json.dumps(
|
|
|
- self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
|
|
|
+ self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
|
|
|
else:
|
|
|
self.DEBUG_HEARTBEAT_RETRIES += 1
|
|
|
|
|
|
- if logger.isEnabledFor(logging.DEBUG):
|
|
|
- logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
|
|
|
+
|
|
|
+ logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
|
|
|
|
|
|
response = self.sendRequest(self.heartbeatUrl, data)
|
|
|
exitStatus = 0
|
|
@@ -268,14 +300,25 @@ class Controller(threading.Thread):
|
|
|
|
|
|
serverId = int(response['responseId'])
|
|
|
|
|
|
- logger.info('Heartbeat response received (id = %s)', serverId)
|
|
|
+
|
|
|
+ logger.debug('Heartbeat response received (id = %s)', serverId)
|
|
|
+
|
|
|
+ cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
|
|
|
+
|
|
|
+ # TODO: this needs to be revised if hosts can be shared across multiple clusters
|
|
|
+ heartbeat_interval = self.get_heartbeat_interval(cluster_size) \
|
|
|
+ if cluster_size > 0 \
|
|
|
+ else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
|
|
|
+
|
|
|
+
|
|
|
+ logger.debug("Heartbeat interval is %s seconds", heartbeat_interval)
|
|
|
|
|
|
if 'hasMappedComponents' in response.keys():
|
|
|
self.hasMappedComponents = response['hasMappedComponents'] is not False
|
|
|
|
|
|
if 'hasPendingTasks' in response.keys():
|
|
|
- self.recovery_manager.set_paused(response['hasPendingTasks'])
|
|
|
-
|
|
|
+ has_pending_tasks = bool(response['hasPendingTasks'])
|
|
|
+ self.recovery_manager.set_paused(has_pending_tasks)
|
|
|
|
|
|
if 'registrationCommand' in response.keys():
|
|
|
# check if the registration command is None. If none skip
|
|
@@ -299,6 +342,8 @@ class Controller(threading.Thread):
|
|
|
self.restartAgent()
|
|
|
else:
|
|
|
self.responseId = serverId
|
|
|
+ if send_state:
|
|
|
+ last_state_timestamp = time.time()
|
|
|
|
|
|
# if the response contains configurations, update the in-memory and
|
|
|
# disk-based configuration cache (execution and alert commands have this)
|
|
@@ -381,8 +426,8 @@ class Controller(threading.Thread):
|
|
|
time.sleep(delay)
|
|
|
|
|
|
# Sleep for some time
|
|
|
- timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
|
|
|
- - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
|
|
|
+ timeout = heartbeat_interval - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
|
|
|
+
|
|
|
if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
|
|
|
# Stop loop when stop event received
|
|
|
logger.info("Stop event received")
|
|
@@ -427,7 +472,7 @@ class Controller(threading.Thread):
|
|
|
for callback in self.registration_listeners:
|
|
|
callback()
|
|
|
|
|
|
- time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
|
|
|
+ time.sleep(self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC)
|
|
|
self.heartbeatWithServer()
|
|
|
else:
|
|
|
logger.info("Registration response from %s didn't contain 'response' as a key".format(self.serverHostname))
|