|
@@ -274,7 +274,7 @@ class Controller(threading.Thread):
|
|
|
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
|
|
|
retry = False
|
|
|
certVerifFailed = False
|
|
|
- state_interval = self.config.get('heartbeat', 'state_interval_seconds', '60')
|
|
|
+ state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60'))
|
|
|
|
|
|
# last time when state was successfully sent to server
|
|
|
last_state_timestamp = 0.0
|
|
@@ -289,27 +289,34 @@ class Controller(threading.Thread):
|
|
|
getrecoverycommands_timestamp = 0.0
|
|
|
getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
|
|
|
|
|
|
+ heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
|
|
|
+
|
|
|
while not self.DEBUG_STOP_HEARTBEATING:
|
|
|
- heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
|
|
|
+ current_time = time.time()
|
|
|
+ logging_level = logging.DEBUG
|
|
|
+ if current_time - heartbeat_running_msg_timestamp > state_interval:
|
|
|
+ # log more steps every minute or so
|
|
|
+ logging_level = logging.INFO
|
|
|
+ heartbeat_running_msg_timestamp = current_time
|
|
|
|
|
|
try:
|
|
|
- crt_time = time.time()
|
|
|
- if crt_time - heartbeat_running_msg_timestamp > int(state_interval):
|
|
|
- logger.info("Heartbeat (response id = %s) with server is running...", self.responseId)
|
|
|
- heartbeat_running_msg_timestamp = crt_time
|
|
|
+ logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
|
|
|
|
|
|
send_state = False
|
|
|
if not retry:
|
|
|
- if crt_time - last_state_timestamp > int(state_interval):
|
|
|
+ if current_time - last_state_timestamp > state_interval:
|
|
|
send_state = True
|
|
|
|
|
|
- data = json.dumps(
|
|
|
- self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
|
|
|
+ logger.log(logging_level, "Building heartbeat message")
|
|
|
+
|
|
|
+ data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
|
|
|
else:
|
|
|
self.DEBUG_HEARTBEAT_RETRIES += 1
|
|
|
|
|
|
-
|
|
|
- logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
|
|
|
+ if logger.isEnabledFor(logging.DEBUG):
|
|
|
+ logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
|
|
|
+ else:
|
|
|
+ logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
|
|
|
|
|
|
response = self.sendRequest(self.heartbeatUrl, data)
|
|
|
exitStatus = 0
|
|
@@ -321,8 +328,7 @@ class Controller(threading.Thread):
|
|
|
|
|
|
serverId = int(response['responseId'])
|
|
|
|
|
|
-
|
|
|
- logger.debug('Heartbeat response received (id = %s)', serverId)
|
|
|
+ logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
|
|
|
|
|
|
cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
|
|
|
|
|
@@ -331,8 +337,7 @@ class Controller(threading.Thread):
|
|
|
if cluster_size > 0 \
|
|
|
else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
|
|
|
|
|
|
-
|
|
|
- logger.debug("Heartbeat interval is %s seconds", heartbeat_interval)
|
|
|
+ logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
|
|
|
|
|
|
if 'hasMappedComponents' in response.keys():
|
|
|
self.hasMappedComponents = response['hasMappedComponents'] is not False
|
|
@@ -364,10 +369,11 @@ class Controller(threading.Thread):
|
|
|
else:
|
|
|
self.responseId = serverId
|
|
|
if send_state:
|
|
|
- last_state_timestamp = time.time()
|
|
|
+ last_state_timestamp = current_time
|
|
|
|
|
|
# if the response contains configurations, update the in-memory and
|
|
|
# disk-based configuration cache (execution and alert commands have this)
|
|
|
+ logger.log(logging_level, "Updating configurations from heartbeat")
|
|
|
self.cluster_configuration.update_configurations_from_heartbeat(response)
|
|
|
|
|
|
response_keys = response.keys()
|
|
@@ -375,6 +381,8 @@ class Controller(threading.Thread):
|
|
|
# there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue
|
|
|
# this can cause command failure instead result suppression
|
|
|
# so canceling and putting rescheduled commands should be executed atomically
|
|
|
+ if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
|
|
|
+ logger.log(logging_level, "Adding cancel/execution commands")
|
|
|
with self.actionQueue.lock:
|
|
|
if 'cancelCommands' in response_keys:
|
|
|
self.cancelCommandInQueue(response['cancelCommands'])
|
|
@@ -388,9 +396,10 @@ class Controller(threading.Thread):
|
|
|
# try storing execution command details and desired state
|
|
|
self.addToStatusQueue(response['statusCommands'])
|
|
|
|
|
|
- if crt_time - getrecoverycommands_timestamp > int(getrecoverycommands_interval):
|
|
|
- getrecoverycommands_timestamp = crt_time
|
|
|
+ if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
|
|
|
+ getrecoverycommands_timestamp = current_time
|
|
|
if not self.actionQueue.tasks_in_progress_or_pending():
|
|
|
+ logger.log(logging_level, "Adding recovery commands")
|
|
|
recovery_commands = self.recovery_manager.get_recovery_commands()
|
|
|
for recovery_command in recovery_commands:
|
|
|
logger.info("Adding recovery command %s for component %s",
|
|
@@ -398,9 +407,11 @@ class Controller(threading.Thread):
|
|
|
self.addToQueue([recovery_command])
|
|
|
|
|
|
if 'alertDefinitionCommands' in response_keys:
|
|
|
+ logger.log(logging_level, "Updating alert definitions")
|
|
|
self.alert_scheduler_handler.update_definitions(response)
|
|
|
|
|
|
if 'alertExecutionCommands' in response_keys:
|
|
|
+ logger.log(logging_level, "Executing alert commands")
|
|
|
self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
|
|
|
|
|
|
if "true" == response['restartAgent']:
|
|
@@ -414,6 +425,7 @@ class Controller(threading.Thread):
|
|
|
|
|
|
if "recoveryConfig" in response:
|
|
|
# update the list of components enabled for recovery
|
|
|
+ logger.log(logging_level, "Updating recovery config")
|
|
|
self.recovery_manager.update_configuration_from_registration(response)
|
|
|
|
|
|
retry = False
|
|
@@ -455,12 +467,15 @@ class Controller(threading.Thread):
|
|
|
|
|
|
# Sleep for some time
|
|
|
timeout = heartbeat_interval - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
|
|
|
+ logger.log(logging_level, "Waiting %s for next heartbeat", timeout)
|
|
|
|
|
|
if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
|
|
|
# Stop loop when stop event received
|
|
|
logger.info("Stop event received")
|
|
|
self.DEBUG_STOP_HEARTBEATING=True
|
|
|
|
|
|
+ logger.log(logging_level, "Wait for next heartbeat over")
|
|
|
+
|
|
|
def spawnStatusCommandsExecutorProcess(self):
|
|
|
'''
|
|
|
Starts a new StatusCommandExecutor child process. In case there is a running instance
|