|
@@ -23,6 +23,7 @@ import signal
|
|
|
import json
|
|
|
import sys
|
|
|
import os
|
|
|
+import socket
|
|
|
import time
|
|
|
import threading
|
|
|
import urllib2
|
|
@@ -54,7 +55,8 @@ class Controller(threading.Thread):
|
|
|
self.credential = None
|
|
|
self.config = config
|
|
|
self.hostname = hostname.hostname()
|
|
|
- server_secured_url = 'https://' + config.get('server', 'hostname') + \
|
|
|
+ self.serverHostname = config.get('server', 'hostname')
|
|
|
+ server_secured_url = 'https://' + self.serverHostname + \
|
|
|
':' + config.get('server', 'secured_url_port')
|
|
|
self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
|
|
|
self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
|
|
@@ -85,28 +87,39 @@ class Controller(threading.Thread):
|
|
|
ret = {}
|
|
|
|
|
|
while not self.isRegistered:
|
|
|
- try:
|
|
|
+ try:
|
|
|
data = json.dumps(self.register.build(id))
|
|
|
- logger.info("Registering with the server " + pprint.pformat(data))
|
|
|
+ prettyData = pprint.pformat(data)
|
|
|
+
|
|
|
+ try:
|
|
|
+ server_ip = socket.gethostbyname(self.hostname)
|
|
|
+ logger.info("Registering with %s (%s) (agent=%s)", self.hostname, server_ip, prettyData)
|
|
|
+ except socket.error:
|
|
|
+ logger.warn("Unable to determine the IP address of '%s', agent registration may fail (agent=%s)",
|
|
|
+ self.hostname, prettyData)
|
|
|
+
|
|
|
ret = self.sendRequest(self.registerUrl, data)
|
|
|
- exitstatus = 0
|
|
|
+
|
|
|
# exitstatus is a code of error which was rised on server side.
|
|
|
# exitstatus = 0 (OK - Default)
|
|
|
- # exitstatus = 1 (Registration failed because
|
|
|
- # different version of agent and server)
|
|
|
+ # exitstatus = 1 (Registration failed because different version of agent and server)
|
|
|
+ exitstatus = 0
|
|
|
if 'exitstatus' in ret.keys():
|
|
|
exitstatus = int(ret['exitstatus'])
|
|
|
- # log - message, which will be printed to agents log
|
|
|
- if 'log' in ret.keys():
|
|
|
- log = ret['log']
|
|
|
+
|
|
|
if exitstatus == 1:
|
|
|
+ # log - message, which will be printed to agents log
|
|
|
+ if 'log' in ret.keys():
|
|
|
+ log = ret['log']
|
|
|
+
|
|
|
logger.error(log)
|
|
|
self.isRegistered = False
|
|
|
self.repeatRegistration=False
|
|
|
return ret
|
|
|
- logger.info("Registered with the server with " + pprint.pformat(ret))
|
|
|
- print("Registered with the server")
|
|
|
- self.responseId= int(ret['responseId'])
|
|
|
+
|
|
|
+ logger.info("Registration Successful (response=%s)", pprint.pformat(ret))
|
|
|
+
|
|
|
+ self.responseId = int(ret['responseId'])
|
|
|
self.isRegistered = True
|
|
|
if 'statusCommands' in ret.keys():
|
|
|
logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
|
|
@@ -119,10 +132,10 @@ class Controller(threading.Thread):
|
|
|
self.repeatRegistration=False
|
|
|
self.isRegistered = False
|
|
|
return
|
|
|
- except Exception, err:
|
|
|
+ except Exception:
|
|
|
# try a reconnect only after a certain amount of random time
|
|
|
delay = randint(0, self.range)
|
|
|
- logger.info("Unable to connect to: " + self.registerUrl, exc_info = True)
|
|
|
+ logger.error("Unable to connect to: " + self.registerUrl, exc_info = True)
|
|
|
""" Sleeping for {0} seconds and then retrying again """.format(delay)
|
|
|
time.sleep(delay)
|
|
|
pass
|
|
@@ -135,7 +148,7 @@ class Controller(threading.Thread):
|
|
|
""" Put the required actions into the Queue """
|
|
|
""" Verify if the action is to reboot or not """
|
|
|
if not commands:
|
|
|
- logger.debug("No commands from the server : " + pprint.pformat(commands))
|
|
|
+ logger.debug("No commands received from %s", self.serverHostname)
|
|
|
else:
|
|
|
"""Only add to the queue if not empty list """
|
|
|
self.actionQueue.put(commands)
|
|
@@ -143,7 +156,7 @@ class Controller(threading.Thread):
|
|
|
|
|
|
def addToStatusQueue(self, commands):
|
|
|
if not commands:
|
|
|
- logger.debug("No status commands from the server : " + pprint.pformat(commands))
|
|
|
+ logger.debug("No status commands received from %s", self.serverHostname)
|
|
|
else:
|
|
|
if not LiveStatus.SERVICES:
|
|
|
self.updateComponents(commands[0]['clusterName'])
|
|
@@ -171,15 +184,28 @@ class Controller(threading.Thread):
|
|
|
if not retry:
|
|
|
data = json.dumps(
|
|
|
self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
|
|
|
- logger.debug("Sending request: " + data)
|
|
|
pass
|
|
|
else:
|
|
|
self.DEBUG_HEARTBEAT_RETRIES += 1
|
|
|
- response = self.sendRequest(self.heartbeatUrl, data)
|
|
|
|
|
|
- logger.debug('Got server response: ' + pprint.pformat(response))
|
|
|
+ if logger.isEnabledFor(logging.DEBUG):
|
|
|
+ logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
|
|
|
+
|
|
|
+ response = self.sendRequest(self.heartbeatUrl, data)
|
|
|
|
|
|
- serverId=int(response['responseId'])
|
|
|
+ exitStatus = 0
|
|
|
+ if 'exitstatus' in response.keys():
|
|
|
+ exitStatus = int(response['exitstatus'])
|
|
|
+
|
|
|
+ if exitStatus != 0:
|
|
|
+ raise Exception(response)
|
|
|
+
|
|
|
+ serverId = int(response['responseId'])
|
|
|
+
|
|
|
+ if logger.isEnabledFor(logging.DEBUG):
|
|
|
+ logger.debug('Heartbeat response (id = %s): %s', serverId, pprint.pformat(response))
|
|
|
+ else:
|
|
|
+ logger.info('Heartbeat response received (id = %s)', serverId)
|
|
|
|
|
|
if 'hasMappedComponents' in response.keys():
|
|
|
self.hasMappedComponents = response['hasMappedComponents'] != False
|
|
@@ -192,7 +218,7 @@ class Controller(threading.Thread):
|
|
|
self.repeatRegistration = True
|
|
|
return
|
|
|
|
|
|
- if serverId!=self.responseId+1:
|
|
|
+ if serverId != self.responseId + 1:
|
|
|
logger.error("Error in responseId sequence - restarting")
|
|
|
self.restartAgent()
|
|
|
else:
|
|
@@ -201,19 +227,21 @@ class Controller(threading.Thread):
|
|
|
if 'executionCommands' in response.keys():
|
|
|
self.addToQueue(response['executionCommands'])
|
|
|
pass
|
|
|
+
|
|
|
if 'statusCommands' in response.keys():
|
|
|
self.addToStatusQueue(response['statusCommands'])
|
|
|
pass
|
|
|
+
|
|
|
if "true" == response['restartAgent']:
|
|
|
- logger.error("Got restartAgent command")
|
|
|
+ logger.error("Received the restartAgent command")
|
|
|
self.restartAgent()
|
|
|
else:
|
|
|
- logger.info("No commands sent from the Server.")
|
|
|
+ logger.info("No commands sent from %s", self.serverHostname)
|
|
|
pass
|
|
|
|
|
|
if retry:
|
|
|
- print("Reconnected to the server")
|
|
|
- logger.info("Reconnected to the server")
|
|
|
+ logger.info("Reconnected to %s", self.heartbeatUrl)
|
|
|
+
|
|
|
retry=False
|
|
|
certVerifFailed = False
|
|
|
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
|
|
@@ -227,18 +255,29 @@ class Controller(threading.Thread):
|
|
|
#randomize the heartbeat
|
|
|
delay = randint(0, self.range)
|
|
|
time.sleep(delay)
|
|
|
+
|
|
|
if "code" in err:
|
|
|
logger.error(err.code)
|
|
|
else:
|
|
|
- logger.error("Unable to connect to: " + self.heartbeatUrl + " due to " + str(err))
|
|
|
- logger.debug("Details: " + str(err), exc_info=True)
|
|
|
+ logException = False
|
|
|
+ if logger.isEnabledFor(logging.DEBUG):
|
|
|
+ logException = True
|
|
|
+
|
|
|
+ exceptionMessage = str(err)
|
|
|
+ errorMessage = "Unable to reconnect to {0} (attempts={1}, details={2})".format(self.heartbeatUrl, self.DEBUG_HEARTBEAT_RETRIES, exceptionMessage)
|
|
|
+
|
|
|
if not retry:
|
|
|
- print("Connection to the server was lost. Reconnecting...")
|
|
|
+ errorMessage = "Connection to {0} was lost (details={1})".format(self.serverHostname, exceptionMessage)
|
|
|
+
|
|
|
+ logger.error(errorMessage, exc_info=logException)
|
|
|
+
|
|
|
if 'certificate verify failed' in str(err) and not certVerifFailed:
|
|
|
- print("Server certificate verify failed. Did you regenerate server certificate?")
|
|
|
+ logger.warn("Server certificate verify failed. Did you regenerate server certificate?")
|
|
|
certVerifFailed = True
|
|
|
+
|
|
|
self.cachedconnect = None # Previous connection is broken now
|
|
|
retry=True
|
|
|
+
|
|
|
# Sleep for some time
|
|
|
timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
|
|
|
- self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
|
|
@@ -268,11 +307,13 @@ class Controller(threading.Thread):
|
|
|
def registerAndHeartbeat(self):
|
|
|
registerResponse = self.registerWithServer()
|
|
|
message = registerResponse['response']
|
|
|
- logger.info("Response from server = " + message)
|
|
|
+ logger.info("Registration response from %s was %s", self.serverHostname, message)
|
|
|
+
|
|
|
if self.isRegistered:
|
|
|
# Process callbacks
|
|
|
for callback in self.registration_listeners:
|
|
|
callback()
|
|
|
+
|
|
|
time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
|
|
|
self.heartbeatWithServer()
|
|
|
|
|
@@ -281,17 +322,17 @@ class Controller(threading.Thread):
|
|
|
pass
|
|
|
|
|
|
def sendRequest(self, url, data):
|
|
|
+ response = None
|
|
|
+
|
|
|
try:
|
|
|
if self.cachedconnect is None: # Lazy initialization
|
|
|
self.cachedconnect = security.CachedHTTPSConnection(self.config)
|
|
|
- req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
|
|
|
- response = None
|
|
|
+ req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
|
|
|
response = self.cachedconnect.request(req)
|
|
|
return json.loads(response)
|
|
|
- except Exception:
|
|
|
+ except Exception, exception:
|
|
|
if response is None:
|
|
|
- err_msg = 'Request failed! Data: ' + str(data)
|
|
|
- logger.warn(err_msg)
|
|
|
+ err_msg = 'Request to {0} failed due to {1}'.format(url, str(exception))
|
|
|
return {'exitstatus': 1, 'log': err_msg}
|
|
|
else:
|
|
|
err_msg = ('Response parsing failed! Request data: ' + str(data)
|
|
@@ -301,8 +342,10 @@ class Controller(threading.Thread):
|
|
|
|
|
|
def updateComponents(self, cluster_name):
|
|
|
logger.info("Updating components map of cluster " + cluster_name)
|
|
|
- response = self.sendRequest(self.componentsUrl + cluster_name, None)
|
|
|
- logger.debug("Response from server = " + str(response))
|
|
|
+
|
|
|
+ response = self.sendRequest(self.componentsUrl + cluster_name, None)
|
|
|
+ logger.debug("Response from %s was %s", self.serverHostname, str(response))
|
|
|
+
|
|
|
for service, components in response['components'].items():
|
|
|
LiveStatus.SERVICES.append(service)
|
|
|
for component, category in components.items():
|